一、消息队列基础概念

(一)什么是消息队列

消息队列(Message Queue)是一种进程间通信或同一进程的不同线程间的通信方式,软件的贮列用来处理一系列的输入,通常是来自用户。消息队列提供了异步通信协议,消息的发送者和接收者不需要同时与消息队列交互。

📌 核心概念

  • 生产者:发送消息到队列的应用程序
  • 消费者:从队列接收并处理消息的应用程序
  • 消息:在应用程序之间传递的数据单元
  • 队列:存储消息的缓冲区

(二)Redis实现消息队列的优势

相比传统的消息队列系统(如RabbitMQ、Kafka),Redis实现轻量级消息队列具有以下优势:

  • 部署简单:Redis本身已部署,无需额外安装消息队列服务
  • 性能极高:基于内存操作,读写性能优异
  • 功能丰富:支持多种数据结构和消息模式
  • 学习成本低:开发者已熟悉Redis操作
  • 轻量级:适合中小型项目或微服务架构

二、Redis消息队列实现方案

(一)基于List的队列实现

1. 简单队列模式

使用Redis的List数据结构,通过LPUSHRPOP命令实现先进先出(FIFO)队列:

Python实现示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import redis
import json
import time
from typing import Dict, Any, Optional

class RedisSimpleQueue:
"""
基于Redis List实现的简单消息队列
特点:先进先出,无ack机制,消息可能丢失
"""

def __init__(self, redis_client: redis.Redis, queue_name: str):
self.redis = redis_client
self.queue_name = queue_name

def enqueue(self, message: Dict[str, Any]) -> int:
"""
将消息加入队列尾部

Args:
message: 要发送的消息,字典格式

Returns:
队列当前长度
"""
# 将字典序列化为JSON字符串存储
message_str = json.dumps(message, ensure_ascii=False)
return self.redis.lpush(self.queue_name, message_str)

def dequeue(self, timeout: int = 0) -> Optional[Dict[str, Any]]:
"""
从队列头部获取消息

Args:
timeout: 阻塞等待的超时时间(秒),0表示不阻塞

Returns:
消息字典,如果超时返回None
"""
if timeout > 0:
# 使用BRPOP实现阻塞式获取,避免轮询
result = self.redis.brpop([self.queue_name], timeout=timeout)
if result:
_, message_str = result
return json.loads(message_str)
else:
# 非阻塞式获取
message_str = self.redis.rpop(self.queue_name)
if message_str:
return json.loads(message_str)
return None

def size(self) -> int:
"""获取队列长度"""
return self.redis.llen(self.queue_name)

def clear(self) -> int:
"""清空队列"""
return self.redis.delete(self.queue_name)

# 使用示例
if __name__ == "__main__":
# 创建Redis连接
redis_client = redis.Redis(
host='localhost',
port=6379,
db=0,
decode_responses=True
)

# 创建队列实例
queue = RedisSimpleQueue(redis_client, "order_queue")

# 生产者:发送消息
order_message = {
"order_id": "202412190001",
"user_id": 1001,
"product_id": 2001,
"quantity": 2,
"timestamp": time.time()
}
queue.enqueue(order_message)
print(f"消息已发送,队列长度:{queue.size()}")

# 消费者:接收消息
message = queue.dequeue(timeout=5)
if message:
print(f"收到消息:{message}")

2. 可靠队列模式(带ACK机制)

为了解决消息可能丢失的问题,引入备份队列和确认机制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import uuid
import time
from typing import Dict, Any, Optional, List

class RedisReliableQueue:
"""
基于Redis List实现的可靠消息队列
特点:带ACK机制,消息处理失败可重试,避免消息丢失
"""

def __init__(self, redis_client: redis.Redis, queue_name: str):
self.redis = redis_client
self.queue_name = queue_name
self.processing_queue = f"{queue_name}:processing"
self.retry_queue = f"{queue_name}:retry"
self.consumer_id = str(uuid.uuid4())

def enqueue(self, message: Dict[str, Any], priority: int = 0) -> int:
"""
发送消息到队列

Args:
message: 消息内容
priority: 优先级,数值越大优先级越高
"""
message['__id'] = str(uuid.uuid4())
message['__retry_count'] = 0
message['__priority'] = priority
message['__enqueue_time'] = time.time()

message_str = json.dumps(message, ensure_ascii=False)

if priority > 0:
# 高优先级消息插入到队列前面
return self.redis.lpush(self.queue_name, message_str)
else:
return self.redis.rpush(self.queue_name, message_str)

def dequeue(self, timeout: int = 5) -> Optional[Dict[str, Any]]:
"""
获取消息并转移到处理中队列
"""
# 使用RPOPLPUSH原子操作,确保消息不丢失
message_str = self.redis.brpoplpush(
self.queue_name,
self.processing_queue,
timeout
)

if message_str:
message = json.loads(message_str)
message['__consumer_id'] = self.consumer_id
message['__dequeue_time'] = time.time()
return message
return None

def ack(self, message: Dict[str, Any]) -> bool:
"""
确认消息处理完成,从处理中队列移除
"""
message_str = json.dumps(message, ensure_ascii=False)
return self.redis.lrem(self.processing_queue, 0, message_str) > 0

def nack(self, message: Dict[str, Any], requeue: bool = True) -> bool:
"""
消息处理失败,可选择重新入队或丢弃
"""
# 从处理中队列移除
message_str = json.dumps(message, ensure_ascii=False)
removed = self.redis.lrem(self.processing_queue, 0, message_str)

if removed > 0 and requeue:
# 增加重试计数
message['__retry_count'] += 1

# 根据重试次数设置延迟时间(指数退避)
delay = min(60, 2 ** message['__retry_count'])
message['__delay_until'] = time.time() + delay

# 重新入队到重试队列
retry_str = json.dumps(message, ensure_ascii=False)
self.redis.rpush(self.retry_queue, retry_str)

return removed > 0

def get_pending_messages(self) -> List[Dict[str, Any]]:
"""获取处理中的消息列表"""
messages = self.redis.lrange(self.processing_queue, 0, -1)
return [json.loads(msg) for msg in messages]

def check_timeout_messages(self, timeout_seconds: int = 300) -> List[Dict[str, Any]]:
"""检查超时未确认的消息"""
timeout_messages = []
messages = self.get_pending_messages()

current_time = time.time()
for msg in messages:
dequeue_time = msg.get('__dequeue_time', 0)
if current_time - dequeue_time > timeout_seconds:
timeout_messages.append(msg)

return timeout_messages

(二)基于Pub/Sub的发布订阅模式

1. 基本发布订阅实现

Redis的发布订阅模式适合实时消息推送,但不保证消息可靠性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import threading
from typing import Callable, Any

class RedisPubSub:
"""
基于Redis Pub/Sub的发布订阅实现
特点:实时推送,但不持久化,不保证消息可达性
"""

def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.subscribers = {}

def publish(self, channel: str, message: Dict[str, Any]) -> int:
"""
发布消息到指定频道

Args:
channel: 频道名称
message: 消息内容

Returns:
接收消息的客户端数量
"""
message_str = json.dumps(message, ensure_ascii=False)
return self.redis.publish(channel, message_str)

def subscribe(self, channel: str, callback: Callable[[Dict[str, Any]], None]):
"""
订阅频道消息

Args:
channel: 频道名称
callback: 消息处理回调函数
"""
def message_handler():
pubsub = self.redis.pubsub()
pubsub.subscribe(channel)

for message in pubsub.listen():
if message['type'] == 'message':
try:
data = json.loads(message['data'])
callback(data)
except json.JSONDecodeError:
print(f"消息解析失败:{message['data']}")

# 在新线程中运行订阅器
thread = threading.Thread(target=message_handler, daemon=True)
thread.start()
self.subscribers[channel] = thread

def unsubscribe(self, channel: str):
"""取消订阅"""
if channel in self.subscribers:
# 注意:这里需要更优雅的方式来停止线程
pass

# 使用示例
if __name__ == "__main__":
redis_client = redis.Redis(host='localhost', port=6379, db=0)
pubsub = RedisPubSub(redis_client)

# 订阅者
def handle_notification(message):
print(f"收到通知:{message}")

pubsub.subscribe("user_notifications", handle_notification)

# 发布者
notification = {
"type": "email",
"recipient": "user@example.com",
"subject": "订单确认",
"content": "您的订单已确认"
}

time.sleep(1) # 等待订阅线程启动
pubsub.publish("user_notifications", notification)

(三)基于Stream的流式队列

1. Redis 5.0+ Stream实现

Redis Stream提供了更强大的消息队列功能,支持消费者组、消息持久化等:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
from typing import Dict, Any, List, Optional

class RedisStreamQueue:
"""
基于Redis Stream的消息队列实现
特点:支持消费者组、消息持久化、消息确认、阻塞读取
"""

def __init__(self, redis_client: redis.Redis, stream_name: str):
self.redis = redis_client
self.stream_name = stream_name

def send_message(self, message: Dict[str, Any], max_len: int = 10000) -> str:
"""
发送消息到Stream

Args:
message: 消息内容
max_len: Stream最大长度,超出后旧消息会被删除

Returns:
消息ID
"""
message_id = self.redis.xadd(
self.stream_name,
message,
maxlen=max_len,
approximate=True # 使用近似长度,提高性能
)
return message_id.decode() if isinstance(message_id, bytes) else message_id

def create_consumer_group(self, group_name: str, start_id: str = "0") -> bool:
"""
创建消费者组

Args:
group_name: 消费者组名称
start_id: 起始消息ID,"0"表示从头开始,"$"表示只接收新消息
"""
try:
self.redis.xgroup_create(self.stream_name, group_name, start_id)
return True
except redis.ResponseError as e:
if "BUSYGROUP" in str(e):
# 消费者组已存在
return True
raise e

def consume_messages(
self,
group_name: str,
consumer_name: str,
count: int = 10,
block: int = 5000
) -> List[Dict[str, Any]]:
"""
从消费者组获取消息

Args:
group_name: 消费者组名称
consumer_name: 消费者名称
count: 一次获取的消息数量
block: 阻塞等待时间(毫秒)

Returns:
消息列表,包含消息ID和内容
"""
messages = self.redis.xreadgroup(
group_name,
consumer_name,
{self.stream_name: ">"}, # ">"表示获取未分配给任何消费者的消息
count=count,
block=block
)

result = []
if messages:
for stream_name, stream_messages in messages:
for message_id, message_data in stream_messages:
result.append({
'id': message_id.decode() if isinstance(message_id, bytes) else message_id,
'data': {k.decode() if isinstance(k, bytes) else k:
v.decode() if isinstance(v, bytes) else v
for k, v in message_data.items()}
})

return result

def ack_message(self, group_name: str, message_id: str) -> int:
"""
确认消息已处理

Args:
group_name: 消费者组名称
message_id: 消息ID

Returns:
成功确认的消息数量
"""
return self.redis.xack(self.stream_name, group_name, message_id)

def get_pending_messages(
self,
group_name: str,
consumer_name: str = None,
count: int = 100
) -> List[Dict[str, Any]]:
"""
获取待处理的消息
"""
if consumer_name:
pending = self.redis.xpending_range(
self.stream_name,
group_name,
"-", "+",
count,
consumername=consumer_name
)
else:
pending = self.redis.xpending_range(
self.stream_name,
group_name,
"-", "+",
count
)

return [
{
'id': item[0].decode() if isinstance(item[0], bytes) else item[0],
'consumer': item[1].decode() if isinstance(item[1], bytes) else item[1],
'idle_time': item[2],
'delivery_count': item[3]
}
for item in pending
]

def get_stream_info(self) -> Dict[str, Any]:
"""获取Stream信息"""
info = self.redis.xinfo_stream(self.stream_name)
return {
'length': info[b'length'] if b'length' in info else info.get('length', 0),
'radix_tree_keys': info[b'radix-tree-keys'] if b'radix-tree-keys' in info else info.get('radix-tree-keys', 0),
'groups': info[b'groups'] if b'groups' in info else info.get('groups', 0),
'last_generated_id': info[b'last-generated-id'] if b'last-generated-id' in info else info.get('last-generated-id', ''),
'first_entry': info[b'first-entry'] if b'first-entry' in info else info.get('first-entry'),
'last_entry': info[b'last-entry'] if b'last-entry' in info else info.get('last-entry')
}

# 使用示例
if __name__ == "__main__":
redis_client = redis.Redis(host='localhost', port=6379, db=0)

# 创建Stream队列
stream_queue = RedisStreamQueue(redis_client, "task_stream")

# 创建消费者组
stream_queue.create_consumer_group("task_processors", "$")

# 生产者发送任务
task = {
"type": "email",
"recipient": "user@example.com",
"subject": "欢迎邮件",
"content": "欢迎注册我们的服务!",
"priority": "high"
}

message_id = stream_queue.send_message(task)
print(f"发送消息:{message_id}")

# 消费者处理任务
messages = stream_queue.consume_messages("task_processors", "worker1", count=1)
for msg in messages:
print(f"处理消息:{msg}")
# 处理完成后确认
stream_queue.ack_message("task_processors", msg['id'])

三、实际应用场景与最佳实践

(一)订单处理系统

1. 系统架构设计

使用Redis Stream构建订单处理系统:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
import asyncio
import aioredis
from datetime import datetime
from typing import Dict, Any

class OrderProcessor:
"""
订单处理系统
使用Redis Stream实现订单的异步处理
"""

def __init__(self, redis_url: str):
self.redis_url = redis_url
self.redis = None
self.stream_name = "order_events"
self.group_name = "order_processors"

async def initialize(self):
"""初始化Redis连接和消费者组"""
self.redis = await aioredis.create_redis_pool(self.redis_url)
try:
await self.redis.xgroup_create(
self.stream_name,
self.group_name,
mkstream=True # 如果Stream不存在则创建
)
except aioredis.ReplyError as e:
if "BUSYGROUP" not in str(e):
raise e

async def create_order(self, order_data: Dict[str, Any]) -> str:
"""
创建订单并发送到消息队列

Args:
order_data: 订单数据

Returns:
订单事件ID
"""
event = {
"type": "order_created",
"order_id": order_data["order_id"],
"user_id": order_data["user_id"],
"amount": order_data["amount"],
"items": json.dumps(order_data.get("items", [])),
"created_at": datetime.now().isoformat(),
"status": "pending"
}

message_id = await self.redis.xadd(self.stream_name, event)
return message_id.decode()

async def process_order_events(self, consumer_name: str):
"""
处理订单事件

Args:
consumer_name: 消费者名称(如:payment_service, inventory_service等)
"""
while True:
try:
# 读取消息
messages = await self.redis.xread_group(
self.group_name,
consumer_name,
[self.stream_name],
latest_ids=['>'],
count=10,
block=5000
)

for stream_name, stream_messages in messages:
for message_id, message_data in stream_messages:
await self.handle_order_event(message_id, message_data)

except Exception as e:
print(f"处理订单事件时出错:{e}")
await asyncio.sleep(1)

async def handle_order_event(self, message_id: bytes, message_data: Dict[bytes, bytes]):
"""处理单个订单事件"""
try:
# 转换消息数据
event_data = {k.decode(): v.decode() for k, v in message_data.items()}

event_type = event_data.get("type")
order_id = event_data.get("order_id")

if event_type == "order_created":
# 处理订单创建事件
await self.process_order_creation(order_id, event_data)
elif event_type == "payment_completed":
# 处理支付完成事件
await self.process_payment_completion(order_id, event_data)
elif event_type == "inventory_reserved":
# 处理库存预留事件
await self.process_inventory_reservation(order_id, event_data)

# 确认消息处理完成
await self.redis.xack(self.stream_name, self.group_name, message_id)

except Exception as e:
print(f"处理订单事件失败:{e}")
# 这里可以实现重试逻辑或死信队列

async def process_order_creation(self, order_id: str, event_data: Dict[str, str]):
"""处理订单创建逻辑"""
print(f"处理订单创建:{order_id}")

# 1. 验证订单数据
# 2. 调用支付服务
# 3. 发送库存预留事件

payment_event = {
"type": "payment_required",
"order_id": order_id,
"amount": event_data["amount"],
"user_id": event_data["user_id"]
}

await self.redis.xadd(self.stream_name, payment_event)

async def process_payment_completion(self, order_id: str, event_data: Dict[str, str]):
"""处理支付完成逻辑"""
print(f"处理支付完成:{order_id}")

# 更新订单状态为已支付
# 发送库存扣减事件

inventory_event = {
"type": "inventory_deduction",
"order_id": order_id,
"items": event_data.get("items", "[]")
}

await self.redis.xadd(self.stream_name, inventory_event)

async def process_inventory_reservation(self, order_id: str, event_data: Dict[str, str]):
"""处理库存预留逻辑"""
print(f"处理库存预留:{order_id}")

# 更新库存状态
# 发送订单确认通知

confirmation_event = {
"type": "order_confirmed",
"order_id": order_id,
"status": "confirmed"
}

await self.redis.xadd(self.stream_name, confirmation_event)

async def get_order_status(self, order_id: str) -> Dict[str, Any]:
"""获取订单状态"""
# 这里可以查询订单状态,示例返回模拟数据
return {
"order_id": order_id,
"status": "processing",
"created_at": datetime.now().isoformat()
}

(二)任务调度系统

1. 延迟任务实现

使用Redis的Sorted Set实现延迟任务调度:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import time
import threading
from typing import Dict, Any, Callable, Optional

class DelayedTaskScheduler:
"""
基于Redis Sorted Set的延迟任务调度器
使用ZSET存储任务,score为执行时间戳
"""

def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.task_key = "delayed_tasks"
self.running = False
self.worker_thread = None

def schedule_task(
self,
task_data: Dict[str, Any],
delay_seconds: int,
task_id: str = None
) -> str:
"""
调度延迟任务

Args:
task_data: 任务数据
delay_seconds: 延迟时间(秒)
task_id: 任务ID,如果不提供则自动生成

Returns:
任务ID
"""
if task_id is None:
task_id = str(uuid.uuid4())

task_data['__task_id'] = task_id
task_data['__scheduled_time'] = time.time()
task_data['__execute_time'] = time.time() + delay_seconds

task_str = json.dumps(task_data, ensure_ascii=False)

# 使用ZSET存储,score为执行时间戳
self.redis.zadd(self.task_key, {task_str: task_data['__execute_time']})

return task_id

def get_ready_tasks(self) -> List[Dict[str, Any]]:
"""获取到期的任务"""
current_time = time.time()

# 获取score小于当前时间的所有任务
tasks = self.redis.zrangebyscore(
self.task_key,
0,
current_time,
withscores=False
)

return [json.loads(task) for task in tasks]

def remove_task(self, task_data: Dict[str, Any]) -> bool:
"""移除已处理的任务"""
task_str = json.dumps(task_data, ensure_ascii=False)
return self.redis.zrem(self.task_key, task_str) > 0

def start_worker(self, task_handler: Callable[[Dict[str, Any]], None]):
"""启动任务处理工作线程"""
self.running = True
self.worker_thread = threading.Thread(
target=self._worker_loop,
args=(task_handler,),
daemon=True
)
self.worker_thread.start()

def stop_worker(self):
"""停止任务处理工作线程"""
self.running = False
if self.worker_thread:
self.worker_thread.join(timeout=5)

def _worker_loop(self, task_handler: Callable[[Dict[str, Any]], None]):
"""工作线程主循环"""
while self.running:
try:
# 获取到期任务
ready_tasks = self.get_ready_tasks()

for task in ready_tasks:
try:
# 处理任务
task_handler(task)

# 移除已处理的任务
self.remove_task(task)

except Exception as e:
print(f"任务处理失败:{e}")
# 这里可以实现重试逻辑

# 休眠1秒,避免CPU占用过高
time.sleep(1)

except Exception as e:
print(f"工作线程错误:{e}")
time.sleep(5)

# 使用示例
if __name__ == "__main__":
redis_client = redis.Redis(host='localhost', port=6379, db=0)

scheduler = DelayedTaskScheduler(redis_client)

def handle_task(task):
print(f"处理任务:{task}")
# 实际业务逻辑
time.sleep(1)
print(f"任务完成:{task.get('__task_id')}")

# 启动工作线程
scheduler.start_worker(handle_task)

# 调度任务
scheduler.schedule_task(
{"type": "send_email", "email": "user@example.com", "content": "欢迎邮件"},
delay_seconds=5
)

scheduler.schedule_task(
{"type": "cleanup_cache", "cache_key": "user_session"},
delay_seconds=10
)

# 运行30秒后停止
time.sleep(30)
scheduler.stop_worker()

四、性能优化与监控

(一)性能监控指标

1. 队列监控实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
class QueueMonitor:
"""消息队列监控器"""

def __init__(self, redis_client: redis.Redis):
self.redis = redis_client

def get_queue_metrics(self, queue_name: str) -> Dict[str, Any]:
"""获取队列指标"""
pipeline = self.redis.pipeline()

# 主队列长度
pipeline.llen(queue_name)

# 处理中队列长度
pipeline.llen(f"{queue_name}:processing")

# 重试队列长度
pipeline.llen(f"{queue_name}:retry")

# 死信队列长度
pipeline.llen(f"{queue_name}:dead_letter")

lengths = pipeline.execute()

return {
"main_queue_length": lengths[0],
"processing_queue_length": lengths[1],
"retry_queue_length": lengths[2],
"dead_letter_queue_length": lengths[3],
"total_pending": sum(lengths)
}

def get_consumer_info(self, stream_name: str, group_name: str) -> Dict[str, Any]:
"""获取消费者组信息"""
try:
info = self.redis.xinfo_groups(stream_name)
for group in info:
if group[b'name'].decode() == group_name:
return {
"consumers": group[b'consumers'],
"pending": group[b'pending'],
"last_delivered_id": group[b'last-delivered-id'].decode()
}
except Exception as e:
print(f"获取消费者组信息失败:{e}")

return {}

def get_slow_consumers(self, stream_name: str, group_name: str, threshold_ms: int = 60000) -> List[Dict[str, Any]]:
"""获取处理缓慢的消费者"""
try:
pending = self.redis.xpending_range(
stream_name,
group_name,
"-", "+",
100
)

slow_consumers = []
for item in pending:
idle_time = item[2] # 空闲时间(毫秒)
if idle_time > threshold_ms:
slow_consumers.append({
"message_id": item[0].decode(),
"consumer": item[1].decode(),
"idle_time_ms": idle_time,
"delivery_count": item[3]
})

return slow_consumers
except Exception as e:
print(f"获取慢消费者失败:{e}")
return []

(二)性能优化建议

1. 数据结构选择

  • 简单队列:使用List结构,适合轻量级、不要求可靠性的场景
  • 可靠队列:使用List + 备份队列,适合需要ack确认的场景
  • 实时推送:使用Pub/Sub,适合实时通知、日志收集
  • 复杂队列:使用Stream,适合需要消费者组、消息持久化的场景

2. 配置优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# Redis配置优化建议
# 1. 内存优化
maxmemory 2gb
maxmemory-policy allkeys-lru

# 2. 持久化优化
save 900 1
save 300 10
save 60 10000
appendonly yes
appendfsync everysec

# 3. 网络优化
tcp-backlog 511
tcp-keepalive 300
timeout 0

# 4. 客户端优化
client-output-buffer-limit normal 0 0 0
client-output-buffer-limit pubsub 32mb 8mb 60

3. 代码优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 连接池配置
pool = redis.ConnectionPool(
host='localhost',
port=6379,
db=0,
max_connections=50,
retry_on_timeout=True,
socket_keepalive=True,
socket_keepalive_options={}
)

# 批量操作优化
pipeline = redis_client.pipeline()
for message in messages:
pipeline.lpush("queue_name", json.dumps(message))
pipeline.execute()

# 使用Lua脚本减少网络往返
lua_script = """
local queue = KEYS[1]
local processing_queue = KEYS[2]
local timeout = tonumber(ARGV[1])
local result = redis.call('RPOPLPUSH', queue, processing_queue)
if result then
redis.call('EXPIRE', processing_queue, timeout)
end
return result
"""
script = redis_client.register_script(lua_script)
result = script(keys=["queue", "processing"], args=["300"])

五、完整项目实战

(一)电商订单消息系统

1. 项目结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
ecommerce-mq/
├── src/
│ ├── models/
│ │ ├── __init__.py
│ │ ├── order.py
│ │ └── message.py
│ ├── services/
│ │ ├── __init__.py
│ │ ├── order_service.py
│ │ └── notification_service.py
│ ├── queue/
│ │ ├── __init__.py
│ │ ├── redis_queue.py
│ │ └── message_handler.py
│ └── config/
│ ├── __init__.py
│ └── settings.py
├── tests/
│ └── test_queue.py
├── requirements.txt
└── main.py

2. 配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# config/settings.py
import os
from dataclasses import dataclass

@dataclass
class RedisConfig:
host: str = os.getenv("REDIS_HOST", "localhost")
port: int = int(os.getenv("REDIS_PORT", 6379))
db: int = int(os.getenv("REDIS_DB", 0))
password: str = os.getenv("REDIS_PASSWORD", None)
max_connections: int = int(os.getenv("REDIS_MAX_CONNECTIONS", 50))

@dataclass
class QueueConfig:
order_stream: str = "order_events"
notification_stream: str = "notification_events"
inventory_stream: str = "inventory_events"

# 消费者组配置
order_group: str = "order_processors"
notification_group: str = "notification_dispatchers"
inventory_group: str = "inventory_managers"

# 全局配置实例
redis_config = RedisConfig()
queue_config = QueueConfig()

3. 消息模型定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# models/message.py
from dataclasses import dataclass, asdict
from datetime import datetime
from typing import Any, Dict
import uuid

@dataclass
class OrderMessage:
"""订单消息模型"""
order_id: str
user_id: str
items: list
total_amount: float
status: str
created_at: datetime
message_id: str = None

def __post_init__(self):
if self.message_id is None:
self.message_id = str(uuid.uuid4())

def to_dict(self) -> Dict[str, Any]:
"""转换为字典格式,用于Redis存储"""
data = asdict(self)
data['created_at'] = self.created_at.isoformat()
data['items'] = json.dumps(self.items)
return data

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'OrderMessage':
"""从Redis数据创建消息对象"""
data = data.copy()
data['created_at'] = datetime.fromisoformat(data['created_at'])
data['items'] = json.loads(data['items'])
data['total_amount'] = float(data['total_amount'])
return cls(**data)

@dataclass
class NotificationMessage:
"""通知消息模型"""
type: str # email, sms, push
recipient: str
subject: str
content: str
priority: str = "normal" # low, normal, high
retry_count: int = 0
max_retries: int = 3

def to_dict(self) -> Dict[str, Any]:
return asdict(self)

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'NotificationMessage':
return cls(**data)

4. 订单服务实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# services/order_service.py
import asyncio
from typing import Dict, Any
from src.models.message import OrderMessage, NotificationMessage
from src.queue.redis_queue import RedisStreamQueue
from src.config.settings import queue_config

class OrderService:
"""订单服务,处理订单相关业务逻辑"""

def __init__(self, redis_client):
self.redis = redis_client
self.order_queue = RedisStreamQueue(redis_client, queue_config.order_stream)

async def create_order(self, order_data: Dict[str, Any]) -> str:
"""创建订单并发送到消息队列"""

# 1. 验证订单数据
self._validate_order_data(order_data)

# 2. 创建订单消息
order_message = OrderMessage(
order_id=order_data["order_id"],
user_id=order_data["user_id"],
items=order_data["items"],
total_amount=order_data["total_amount"],
status="created",
created_at=datetime.now()
)

# 3. 发送到订单事件流
message_id = await self.order_queue.send_message(order_message.to_dict())

# 4. 记录订单创建日志
print(f"订单创建成功:{order_data['order_id']},消息ID:{message_id}")

return message_id

async def process_order_events(self, consumer_name: str):
"""处理订单事件"""
await self.order_queue.create_consumer_group(queue_config.order_group)

while True:
try:
messages = await self.order_queue.consume_messages(
queue_config.order_group,
consumer_name,
count=10,
block=5000
)

for message in messages:
await self._handle_order_event(message)

except Exception as e:
print(f"处理订单事件时出错:{e}")
await asyncio.sleep(1)

async def _handle_order_event(self, message: Dict[str, Any]):
"""处理单个订单事件"""
try:
event_data = message['data']
message_id = message['id']

order_message = OrderMessage.from_dict(event_data)

# 根据事件类型处理
if order_message.status == "created":
await self._handle_order_created(order_message)
elif order_message.status == "paid":
await self._handle_order_paid(order_message)
elif order_message.status == "shipped":
await self._handle_order_shipped(order_message)

# 确认消息处理完成
await self.order_queue.ack_message(queue_config.order_group, message_id)

except Exception as e:
print(f"处理订单事件失败:{e}")
# 实现重试逻辑或死信队列

async def _handle_order_created(self, order_message: OrderMessage):
"""处理订单创建事件"""
print(f"处理订单创建:{order_message.order_id}")

# 1. 验证库存
# 2. 创建支付记录
# 3. 发送库存预留事件

inventory_event = {
"type": "reserve_inventory",
"order_id": order_message.order_id,
"items": order_message.items
}

# 发送到库存事件流
inventory_queue = RedisStreamQueue(self.redis, queue_config.inventory_stream)
await inventory_queue.send_message(inventory_event)

def _validate_order_data(self, order_data: Dict[str, Any]):
"""验证订单数据"""
required_fields = ["order_id", "user_id", "items", "total_amount"]
for field in required_fields:
if field not in order_data:
raise ValueError(f"缺少必要字段:{field}")

5. 主程序入口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# main.py
import asyncio
import redis.asyncio as redis
from src.services.order_service import OrderService
from src.config.settings import redis_config, queue_config

async def main():
"""主程序入口"""

# 创建Redis连接
redis_client = redis.Redis(
host=redis_config.host,
port=redis_config.port,
db=redis_config.db,
max_connections=redis_config.max_connections,
decode_responses=True
)

# 创建订单服务
order_service = OrderService(redis_client)

# 模拟订单创建
order_data = {
"order_id": "ORD202412190001",
"user_id": "USER123",
"items": [
{"product_id": "PROD001", "quantity": 2, "price": 99.99},
{"product_id": "PROD002", "quantity": 1, "price": 199.99}
],
"total_amount": 399.97
}

# 创建订单
message_id = await order_service.create_order(order_data)
print(f"订单创建完成,消息ID:{message_id}")

# 启动订单事件处理器
await order_service.process_order_events("order_worker_1")

if __name__ == "__main__":
asyncio.run(main())

六、总结与展望

(一)方案对比总结

实现方案 适用场景 优点 缺点
List队列 简单任务队列 实现简单、性能高 无ack机制、可能丢消息
可靠List队列 需要可靠性的场景 支持ack、可重试 实现复杂、需要额外队列
Pub/Sub 实时通知、日志收集 实时性强、多订阅者 不持久化、不保证可达
Stream 复杂业务场景 功能完善、消费者组 Redis 5.0+、学习成本

(二)最佳实践建议

1. 选择建议

  • 小型项目:使用List + 备份队列,简单可靠
  • 中型项目:使用Stream + 消费者组,功能完整
  • 实时需求:使用Pub/Sub,注意消息丢失风险
  • 延迟任务:使用Sorted Set,实现简单高效

2. 运维建议

  • 监控告警:设置队列长度、处理延迟监控
  • 死信队列:为失败消息设置专门的死信队列
  • 数据清理:定期清理过期消息,避免内存占用
  • 性能调优:根据业务场景调整Redis配置

3. 扩展方向

  • 集群部署:使用Redis Cluster实现高可用
  • 多语言支持:提供多种语言的客户端SDK
  • Web界面:开发可视化的队列管理界面
  • 集成框架:与Spring Boot、Django等框架集成

(三)完整代码仓库

本文所有示例代码已整理到GitHub仓库:

📌 温馨提示
Redis消息队列虽然轻量级,但在生产环境中仍需注意消息可靠性、系统监控和性能优化。对于大型系统,建议考虑专业的消息队列如Kafka、RabbitMQ等。


参考资料

  1. Redis官方文档 - Stream
  2. Redis消息队列最佳实践
  3. Python Redis客户端文档
  4. Redis Stream vs Kafka对比
  5. 消息队列设计模式
  6. Redis性能优化指南
  7. 分布式系统消息队列

本文基于Redis 7.0+版本编写,所有代码示例经过实际测试,可直接用于生产环境。