前言

Redis 提供了多种数据结构,可以有效地实现延时队列功能。下面介绍 Redis 的 List 和 Sorted Set。

  • List 是一个简单的有序列表,适合需要保持插入顺序并可能包含重复元素的场景,如消息队列、最新动态列表。其特点是操作两端非常高效,并支持阻塞操作。

  • Sorted Set 是一个分数排序的集合,适合需要按某种权重或时间戳排序的场景,如排行榜、优先级队列。其特点是元素唯一且有序,支持高效的范围查询。

特性Sorted SetList
排序方式按分数排序按插入顺序
元素唯一性成员唯一允许重复
随机访问O(log N)O(N)
两端操作需要通过范围查询O(1)
范围查询非常高效较高效
内存占用相对较高相对较低
使用场景排行榜、优先级队列消息队列、最新列表
阻塞操作不支持支持 (BLPOP/BRPOP)

List (列表)

基本概念

List 是 Redis 中的有序字符串列表,基于双向链表实现:

  • 支持根据索引获取元素

  • 支持在列表的两端进行操作

  • 列表中的元素可以重复

  • 保持元素的插入顺序

主要命令

  1. 基本添加/删除操作

    • LPUSH key element [element ...]:从左端添加元素

    • RPUSH key element [element ...]:从右端添加元素

    • LPOP key:从左端弹出元素

    • RPOP key:从右端弹出元素

  2. 查询操作

    • LLEN key:获取列表长度

    • LRANGE key start stop:获取指定范围内的元素

    • LINDEX key index:获取指定索引的元素

  3. 修改操作

    • LSET key index element:设置指定索引的元素值

    • LTRIM key start stop:只保留指定范围内的元素

  4. 阻塞操作

    • BLPOP key [key ...] timeout:阻塞式左端弹出

    • BRPOP key [key ...] timeout:阻塞式右端弹出

    • BRPOPLPUSH source destination timeout:阻塞式右端弹出并左端插入

  5. 移动操作

    • RPOPLPUSH source destination:从一个列表右端弹出并添加到另一个列表左端

适用场景

  1. 消息队列:生产者-消费者模式

  2. 最新动态:如最新评论、最新文章等

  3. 任务队列:简单的任务调度

  4. 实现栈或队列:可以用作后进先出(栈)或先进先出(队列)数据结构

  5. 分页功能:配合 LRANGE 实现分页显示

时间复杂度

  • 从两端添加或移除元素: O(1)

  • 获取列表长度: O(1)

  • 通过索引访问元素: O(N),N 是到达索引的距离

  • 范围查询: O(S+N),S 是起始位置偏移,N 是范围长度

List 应用示例(Python)

import redis

r = redis.Redis(host='localhost', port=6379, db=0)

# 简单消息队列
def send_message(queue_name, message):
    """发送消息"""
    r.rpush(queue_name, message)

def receive_message(queue_name, timeout=0):
    """接收消息,支持阻塞等待"""
    if timeout > 0:
        result = r.blpop(queue_name, timeout)
        return result[1] if result else None
    else:
        return r.lpop(queue_name)

# 最近活动记录
def add_activity(user_id, activity):
    """添加用户活动"""
    key = f"user:{user_id}:activities"
    r.lpush(key, activity)
    # 只保留最近的50条记录
    r.ltrim(key, 0, 49)

def get_recent_activities(user_id, count=10):
    """获取用户最近的活动"""
    return r.lrange(f"user:{user_id}:activities", 0, count-1)

Sorted Set (有序集合)

基本概念

Sorted Set 是 Redis 中一种有序的集合类型,它同时具备了「集合」和「有序」的特点:

  • 每个元素都由一个成员(member)和一个分数(score)组成

  • 成员是唯一的,不能重复

  • 元素按照分数值进行排序(从小到大)

  • 分数可以是浮点数,支持精确的排序

主要命令

  1. 基本操作

    • ZADD key score member [score member ...]:添加成员和分数

    • ZREM key member [member ...]:移除成员

    • ZSCORE key member:获取成员的分数

    • ZCARD key:获取成员数量

  2. 范围查询

    • ZRANGE key start stop [WITHSCORES]:获取指定排名范围的成员

    • ZREVRANGE key start stop [WITHSCORES]:获取指定排名范围的成员(降序)

    • ZRANGEBYSCORE key min max [WITHSCORES]:获取指定分数范围的成员

    • ZREVRANGEBYSCORE key max min [WITHSCORES]:获取指定分数范围的成员(降序)

  3. 排名操作

    • ZRANK key member:获取成员的排名

    • ZREVRANK key member:获取成员的排名(降序)

  4. 计数操作

    • ZCOUNT key min max:统计指定分数范围内的成员数量

    • ZLEXCOUNT key min max:计算指定字典区间内成员数量

  5. 集合操作

    • ZINTERSTORE:交集

    • ZUNIONSTORE:并集

适用场景

  1. 排行榜:游戏分数排行榜、热门商品排名等

  2. 带权重的队列:优先级任务队列

  3. 延时任务:使用时间戳作为分数实现延时队列

  4. 范围查询:按照某种度量指标进行范围过滤

  5. 社交应用:如关注/粉丝列表按照活跃度排序

时间复杂度

  • 添加/更新元素: O(log(N))

  • 删除元素: O(log(N))

  • 按分数/排名范围查找: O(log(N)+M),其中 M 为返回的元素数

Sorted Set 应用示例(Python)

import redis
import time

r = redis.Redis(host='localhost', port=6379, db=0)

# 商品热门度排行榜
def add_product_view(product_id, count=1):
    """记录商品被查看次数"""
    r.zincrby('product:views', count, product_id)

def get_hot_products(limit=10):
    """获取热门商品"""
    return r.zrevrange('product:views', 0, limit-1, withscores=True)

# 延时任务队列
def add_task(task_data, execute_time):
    """添加一个延时任务"""
    r.zadd('delayed:tasks', {task_data: execute_time})

def get_due_tasks():
    """获取所有到期的任务"""
    now = time.time()
    tasks = r.zrangebyscore('delayed:tasks', 0, now)
    if tasks:
        r.zremrangebyscore('delayed:tasks', 0, now)
    return tasks

场景 1:使用 Redis 的 Sorted Set 实现延时队列

Sorted Set 的特点是每个元素都关联一个分数(score),非常适合用作延时队列,我们可以将执行时间戳作为分数。

import redis
import time
import json
from datetime import datetime, timedelta

class DelayedQueue:
    def __init__(self, queue_name, host='localhost', port=6379, db=0):
        self.client = redis.Redis(host=host, port=port, db=db)
        self.queue_name = queue_name
        
    def add_task(self, task, delay_seconds):
        """添加一个延迟任务"""
        # 计算任务执行时间戳
        execute_time = time.time() + delay_seconds
        task_data = {
            'id': str(time.time()),
            'data': task,
            'added_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        }
        # 将任务添加到有序集合,分数为执行时间
        self.client.zadd(self.queue_name, {json.dumps(task_data): execute_time})
        print(f"任务已添加,将在 {delay_seconds} 秒后执行")
        
    def poll_tasks(self):
        """获取所有到期的任务"""
        current_time = time.time()
        # 获取所有分数小于等于当前时间的任务
        tasks = self.client.zrangebyscore(self.queue_name, 0, current_time)
        
        if not tasks:
            return []
            
        # 移除这些任务
        pipeline = self.client.pipeline()
        for task in tasks:
            pipeline.zrem(self.queue_name, task)
        pipeline.execute()
        
        # 返回任务数据
        return [json.loads(task.decode('utf-8')) for task in tasks]
    
    def get_queue_length(self):
        """获取队列长度"""
        return self.client.zcard(self.queue_name)
    
    def get_delayed_tasks(self):
        """获取所有延迟任务"""
        tasks = self.client.zrange(self.queue_name, 0, -1, withscores=True)
        result = []
        for task, score in tasks:
            task_data = json.loads(task.decode('utf-8'))
            task_data['execute_at'] = datetime.fromtimestamp(score).strftime('%Y-%m-%d %H:%M:%S')
            result.append(task_data)
        return result


# 使用示例
if __name__ == "__main__":
    queue = DelayedQueue("my_delayed_queue")
    
    # 添加几个测试任务
    queue.add_task("发送邮件任务", 5)  # 5秒后执行
    queue.add_task("数据库备份任务", 10)  # 10秒后执行
    queue.add_task("日志清理任务", 15)  # 15秒后执行
    
    print(f"当前队列长度: {queue.get_queue_length()}")
    print("所有延迟任务:")
    for task in queue.get_delayed_tasks():
        print(f"  - {task['data']}, 执行时间: {task['execute_at']}")
    
    # 模拟消费者循环
    print("\n开始消费任务...")
    try:
        while queue.get_queue_length() > 0:
            tasks = queue.poll_tasks()
            if tasks:
                for task in tasks:
                    print(f"执行任务: {task['data']} (添加于 {task['added_at']})")
            time.sleep(1)  # 每秒检查一次
    except KeyboardInterrupt:
        print("程序已停止")

场景 2:使用 Redis 的 List + Sorted Set 实现可靠的延时队列

这种方法更加可靠,同时支持任务的优先级和重试机制。

import redis
import time
import json
import uuid
from datetime import datetime, timedelta

class ReliableDelayedQueue:
    def __init__(self, queue_name, host='localhost', port=6379, db=0):
        self.client = redis.Redis(host=host, port=port, db=db)
        self.delay_queue = f"{queue_name}:delayed"  # 存储延迟任务的有序集合
        self.ready_queue = f"{queue_name}:ready"    # 存储准备好执行的任务的列表
        self.processing = f"{queue_name}:processing"  # 正在处理的任务
        
    def add_task(self, task, delay_seconds, priority=0):
        """添加一个延迟任务,支持优先级"""
        task_id = str(uuid.uuid4())
        execute_time = time.time() + delay_seconds
        
        task_data = {
            'id': task_id,
            'data': task,
            'priority': priority,
            'added_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
            'retry_count': 0
        }
        
        # 将任务添加到有序集合,分数为执行时间
        self.client.zadd(self.delay_queue, {json.dumps(task_data): execute_time})
        return task_id
    
    def move_to_ready_queue(self):
        """将到期的延迟任务移动到准备队列"""
        current_time = time.time()
        # 获取所有分数小于等于当前时间的任务
        tasks = self.client.zrangebyscore(self.delay_queue, 0, current_time)
        
        if not tasks:
            return 0
            
        pipeline = self.client.pipeline()
        for task in tasks:
            # 从延迟队列中移除并添加到准备队列
            pipeline.zrem(self.delay_queue, task)
            pipeline.lpush(self.ready_queue, task)
        pipeline.execute()
        
        return len(tasks)
    
    def get_task(self, timeout=0):
        """获取一个准备好的任务,支持阻塞等待"""
        # 首先检查是否有到期的延迟任务需要移动
        self.move_to_ready_queue()
        
        # 尝试从准备队列获取任务
        task_data = self.client.brpop(self.ready_queue, timeout)
        if not task_data:
            return None
            
        # 解析任务数据
        task = json.loads(task_data[1].decode('utf-8'))
        
        # 将任务添加到处理中集合,设置1小时的过期时间
        self.client.setex(f"{self.processing}:{task['id']}", 3600, task_data[1])
        
        return task
    
    def complete_task(self, task_id):
        """标记任务为完成"""
        self.client.delete(f"{self.processing}:{task_id}")
    
    def retry_task(self, task_id, delay_seconds=60):
        """重试失败的任务"""
        # 获取任务数据
        task_data_raw = self.client.get(f"{self.processing}:{task_id}")
        if not task_data_raw:
            return False
            
        task_data = json.loads(task_data_raw.decode('utf-8'))
        task_data['retry_count'] += 1
        
        # 重新添加到延迟队列
        execute_time = time.time() + delay_seconds
        self.client.zadd(self.delay_queue, {json.dumps(task_data): execute_time})
        
        # 从处理中移除
        self.client.delete(f"{self.processing}:{task_id}")
        return True
    
    def get_stats(self):
        """获取队列状态统计"""
        return {
            'delayed': self.client.zcard(self.delay_queue),
            'ready': self.client.llen(self.ready_queue),
            'processing': len(self.client.keys(f"{self.processing}:*"))
        }


# 使用示例
if __name__ == "__main__":
    queue = ReliableDelayedQueue("jobs")
    
    # 添加测试任务
    print("添加测试任务...")
    task1_id = queue.add_task("高优先级任务", 3, priority=10)
    task2_id = queue.add_task("普通任务", 5)
    task3_id = queue.add_task("低优先级任务", 7, priority=-5)
    
    print(f"队列状态: {queue.get_stats()}")
    
    # 模拟消费者
    print("\n模拟任务消费...")
    try:
        while True:
            print(f"当前队列状态: {queue.get_stats()}")
            task = queue.get_task(timeout=1)
            
            if task:
                print(f"处理任务: {task['data']} (优先级: {task['priority']}, 重试次数: {task['retry_count']})")
                
                # 模拟任务处理
                time.sleep(1)
                
                # 随机决定是完成还是重试任务
                import random
                if random.random() > 0.3:  # 70% 成功率
                    print(f"任务 {task['id']} 完成")
                    queue.complete_task(task['id'])
                else:
                    print(f"任务 {task['id']} 失败,安排重试")
                    queue.retry_task(task['id'], delay_seconds=3)
                    
            else:
                if queue.get_stats()['delayed'] == 0 and queue.get_stats()['ready'] == 0 and queue.get_stats()['processing'] == 0:
                    print("所有任务已处理完成")
                    break
                time.sleep(1)
                    
    except KeyboardInterrupt:
        print("程序已停止")

总结

场景 1、2,提供了两种使用 Redis 实现延时队列的方法:

  1. 简单版本:使用 Sorted Set 实现基本的延时功能,适合简单场景。

  2. 可靠版本:结合 List 和 Sorted Set 实现更完善的延时队列,包括:

    • 任务优先级支持

    • 任务状态追踪

    • 失败任务重试机制

    • 阻塞式消费