前言
Redis 提供了多种数据结构,可以有效地实现延时队列功能。下面介绍 Redis 的 List 和 Sorted Set。
List 是一个简单的有序列表,适合需要保持插入顺序并可能包含重复元素的场景,如消息队列、最新动态列表。其特点是操作两端非常高效,并支持阻塞操作。
Sorted Set 是一个分数排序的集合,适合需要按某种权重或时间戳排序的场景,如排行榜、优先级队列。其特点是元素唯一且有序,支持高效的范围查询。
特性 | Sorted Set | List |
---|---|---|
排序方式 | 按分数排序 | 按插入顺序 |
元素唯一性 | 成员唯一 | 允许重复 |
随机访问 | O(log N) | O(N) |
两端操作 | 需要通过范围查询 | O(1) |
范围查询 | 非常高效 | 较高效 |
内存占用 | 相对较高 | 相对较低 |
使用场景 | 排行榜、优先级队列 | 消息队列、最新列表 |
阻塞操作 | 不支持 | 支持 (BLPOP/BRPOP) |
List (列表)
基本概念
List 是 Redis 中的有序字符串列表,基于双向链表实现:
支持根据索引获取元素
支持在列表的两端进行操作
列表中的元素可以重复
保持元素的插入顺序
主要命令
基本添加/删除操作
LPUSH key element [element ...]
:从左端添加元素RPUSH key element [element ...]
:从右端添加元素LPOP key
:从左端弹出元素RPOP key
:从右端弹出元素查询操作
LLEN key
:获取列表长度LRANGE key start stop
:获取指定范围内的元素LINDEX key index
:获取指定索引的元素修改操作
LSET key index element
:设置指定索引的元素值LTRIM key start stop
:只保留指定范围内的元素阻塞操作
BLPOP key [key ...] timeout
:阻塞式左端弹出BRPOP key [key ...] timeout
:阻塞式右端弹出BRPOPLPUSH source destination timeout
:阻塞式右端弹出并左端插入移动操作
RPOPLPUSH source destination
:从一个列表右端弹出并添加到另一个列表左端
适用场景
消息队列:生产者-消费者模式
最新动态:如最新评论、最新文章等
任务队列:简单的任务调度
实现栈或队列:可以用作后进先出(栈)或先进先出(队列)数据结构
分页功能:配合 LRANGE 实现分页显示
时间复杂度
从两端添加或移除元素: O(1)
获取列表长度: O(1)
通过索引访问元素: O(N),N 是到达索引的距离
范围查询: O(S+N),S 是起始位置偏移,N 是范围长度
List 应用示例(Python)
import redis r = redis.Redis(host='localhost', port=6379, db=0) # 简单消息队列 def send_message(queue_name, message): """发送消息""" r.rpush(queue_name, message) def receive_message(queue_name, timeout=0): """接收消息,支持阻塞等待""" if timeout > 0: result = r.blpop(queue_name, timeout) return result[1] if result else None else: return r.lpop(queue_name) # 最近活动记录 def add_activity(user_id, activity): """添加用户活动""" key = f"user:{user_id}:activities" r.lpush(key, activity) # 只保留最近的50条记录 r.ltrim(key, 0, 49) def get_recent_activities(user_id, count=10): """获取用户最近的活动""" return r.lrange(f"user:{user_id}:activities", 0, count-1)
Sorted Set (有序集合)
基本概念
Sorted Set 是 Redis 中一种有序的集合类型,它同时具备了「集合」和「有序」的特点:
每个元素都由一个成员(member)和一个分数(score)组成
成员是唯一的,不能重复
元素按照分数值进行排序(从小到大)
分数可以是浮点数,支持精确的排序
主要命令
基本操作
ZADD key score member [score member ...]
:添加成员和分数ZREM key member [member ...]
:移除成员ZSCORE key member
:获取成员的分数ZCARD key
:获取成员数量范围查询
ZRANGE key start stop [WITHSCORES]
:获取指定排名范围的成员ZREVRANGE key start stop [WITHSCORES]
:获取指定排名范围的成员(降序)ZRANGEBYSCORE key min max [WITHSCORES]
:获取指定分数范围的成员ZREVRANGEBYSCORE key max min [WITHSCORES]
:获取指定分数范围的成员(降序)排名操作
ZRANK key member
:获取成员的排名ZREVRANK key member
:获取成员的排名(降序)计数操作
ZCOUNT key min max
:统计指定分数范围内的成员数量ZLEXCOUNT key min max
:计算指定字典区间内成员数量集合操作
ZINTERSTORE
:交集ZUNIONSTORE
:并集
适用场景
排行榜:游戏分数排行榜、热门商品排名等
带权重的队列:优先级任务队列
延时任务:使用时间戳作为分数实现延时队列
范围查询:按照某种度量指标进行范围过滤
社交应用:如关注/粉丝列表按照活跃度排序
时间复杂度
添加/更新元素: O(log(N))
删除元素: O(log(N))
按分数/排名范围查找: O(log(N)+M),其中 M 为返回的元素数
Sorted Set 应用示例(Python)
import redis import time r = redis.Redis(host='localhost', port=6379, db=0) # 商品热门度排行榜 def add_product_view(product_id, count=1): """记录商品被查看次数""" r.zincrby('product:views', count, product_id) def get_hot_products(limit=10): """获取热门商品""" return r.zrevrange('product:views', 0, limit-1, withscores=True) # 延时任务队列 def add_task(task_data, execute_time): """添加一个延时任务""" r.zadd('delayed:tasks', {task_data: execute_time}) def get_due_tasks(): """获取所有到期的任务""" now = time.time() tasks = r.zrangebyscore('delayed:tasks', 0, now) if tasks: r.zremrangebyscore('delayed:tasks', 0, now) return tasks
场景 1:使用 Redis 的 Sorted Set 实现延时队列
Sorted Set 的特点是每个元素都关联一个分数(score),非常适合用作延时队列,我们可以将执行时间戳作为分数。
import redis import time import json from datetime import datetime, timedelta class DelayedQueue: def __init__(self, queue_name, host='localhost', port=6379, db=0): self.client = redis.Redis(host=host, port=port, db=db) self.queue_name = queue_name def add_task(self, task, delay_seconds): """添加一个延迟任务""" # 计算任务执行时间戳 execute_time = time.time() + delay_seconds task_data = { 'id': str(time.time()), 'data': task, 'added_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S') } # 将任务添加到有序集合,分数为执行时间 self.client.zadd(self.queue_name, {json.dumps(task_data): execute_time}) print(f"任务已添加,将在 {delay_seconds} 秒后执行") def poll_tasks(self): """获取所有到期的任务""" current_time = time.time() # 获取所有分数小于等于当前时间的任务 tasks = self.client.zrangebyscore(self.queue_name, 0, current_time) if not tasks: return [] # 移除这些任务 pipeline = self.client.pipeline() for task in tasks: pipeline.zrem(self.queue_name, task) pipeline.execute() # 返回任务数据 return [json.loads(task.decode('utf-8')) for task in tasks] def get_queue_length(self): """获取队列长度""" return self.client.zcard(self.queue_name) def get_delayed_tasks(self): """获取所有延迟任务""" tasks = self.client.zrange(self.queue_name, 0, -1, withscores=True) result = [] for task, score in tasks: task_data = json.loads(task.decode('utf-8')) task_data['execute_at'] = datetime.fromtimestamp(score).strftime('%Y-%m-%d %H:%M:%S') result.append(task_data) return result # 使用示例 if __name__ == "__main__": queue = DelayedQueue("my_delayed_queue") # 添加几个测试任务 queue.add_task("发送邮件任务", 5) # 5秒后执行 queue.add_task("数据库备份任务", 10) # 10秒后执行 queue.add_task("日志清理任务", 15) # 15秒后执行 print(f"当前队列长度: {queue.get_queue_length()}") print("所有延迟任务:") for task in queue.get_delayed_tasks(): print(f" - {task['data']}, 执行时间: {task['execute_at']}") # 模拟消费者循环 print("\n开始消费任务...") try: while queue.get_queue_length() > 0: tasks = queue.poll_tasks() if tasks: for task in tasks: print(f"执行任务: {task['data']} (添加于 {task['added_at']})") time.sleep(1) # 每秒检查一次 except KeyboardInterrupt: print("程序已停止")
场景 2:使用 Redis 的 List + Sorted Set 实现可靠的延时队列
这种方法更加可靠,同时支持任务的优先级和重试机制。
import redis import time import json import uuid from datetime import datetime, timedelta class ReliableDelayedQueue: def __init__(self, queue_name, host='localhost', port=6379, db=0): self.client = redis.Redis(host=host, port=port, db=db) self.delay_queue = f"{queue_name}:delayed" # 存储延迟任务的有序集合 self.ready_queue = f"{queue_name}:ready" # 存储准备好执行的任务的列表 self.processing = f"{queue_name}:processing" # 正在处理的任务 def add_task(self, task, delay_seconds, priority=0): """添加一个延迟任务,支持优先级""" task_id = str(uuid.uuid4()) execute_time = time.time() + delay_seconds task_data = { 'id': task_id, 'data': task, 'priority': priority, 'added_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'retry_count': 0 } # 将任务添加到有序集合,分数为执行时间 self.client.zadd(self.delay_queue, {json.dumps(task_data): execute_time}) return task_id def move_to_ready_queue(self): """将到期的延迟任务移动到准备队列""" current_time = time.time() # 获取所有分数小于等于当前时间的任务 tasks = self.client.zrangebyscore(self.delay_queue, 0, current_time) if not tasks: return 0 pipeline = self.client.pipeline() for task in tasks: # 从延迟队列中移除并添加到准备队列 pipeline.zrem(self.delay_queue, task) pipeline.lpush(self.ready_queue, task) pipeline.execute() return len(tasks) def get_task(self, timeout=0): """获取一个准备好的任务,支持阻塞等待""" # 首先检查是否有到期的延迟任务需要移动 self.move_to_ready_queue() # 尝试从准备队列获取任务 task_data = self.client.brpop(self.ready_queue, timeout) if not task_data: return None # 解析任务数据 task = json.loads(task_data[1].decode('utf-8')) # 将任务添加到处理中集合,设置1小时的过期时间 self.client.setex(f"{self.processing}:{task['id']}", 3600, task_data[1]) return task def complete_task(self, task_id): """标记任务为完成""" self.client.delete(f"{self.processing}:{task_id}") def retry_task(self, task_id, delay_seconds=60): """重试失败的任务""" # 获取任务数据 task_data_raw = self.client.get(f"{self.processing}:{task_id}") if not task_data_raw: return False task_data = json.loads(task_data_raw.decode('utf-8')) task_data['retry_count'] += 1 # 重新添加到延迟队列 execute_time = time.time() + delay_seconds self.client.zadd(self.delay_queue, {json.dumps(task_data): execute_time}) # 从处理中移除 self.client.delete(f"{self.processing}:{task_id}") return True def get_stats(self): """获取队列状态统计""" return { 'delayed': self.client.zcard(self.delay_queue), 'ready': self.client.llen(self.ready_queue), 'processing': len(self.client.keys(f"{self.processing}:*")) } # 使用示例 if __name__ == "__main__": queue = ReliableDelayedQueue("jobs") # 添加测试任务 print("添加测试任务...") task1_id = queue.add_task("高优先级任务", 3, priority=10) task2_id = queue.add_task("普通任务", 5) task3_id = queue.add_task("低优先级任务", 7, priority=-5) print(f"队列状态: {queue.get_stats()}") # 模拟消费者 print("\n模拟任务消费...") try: while True: print(f"当前队列状态: {queue.get_stats()}") task = queue.get_task(timeout=1) if task: print(f"处理任务: {task['data']} (优先级: {task['priority']}, 重试次数: {task['retry_count']})") # 模拟任务处理 time.sleep(1) # 随机决定是完成还是重试任务 import random if random.random() > 0.3: # 70% 成功率 print(f"任务 {task['id']} 完成") queue.complete_task(task['id']) else: print(f"任务 {task['id']} 失败,安排重试") queue.retry_task(task['id'], delay_seconds=3) else: if queue.get_stats()['delayed'] == 0 and queue.get_stats()['ready'] == 0 and queue.get_stats()['processing'] == 0: print("所有任务已处理完成") break time.sleep(1) except KeyboardInterrupt: print("程序已停止")
总结
场景 1、2,提供了两种使用 Redis 实现延时队列的方法:
简单版本:使用 Sorted Set 实现基本的延时功能,适合简单场景。
可靠版本:结合 List 和 Sorted Set 实现更完善的延时队列,包括:
任务优先级支持
任务状态追踪
失败任务重试机制
阻塞式消费