1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
| from typing import Dict, Any, List, Optional
class RedisStreamQueue: """ 基于Redis Stream的消息队列实现 特点:支持消费者组、消息持久化、消息确认、阻塞读取 """ def __init__(self, redis_client: redis.Redis, stream_name: str): self.redis = redis_client self.stream_name = stream_name def send_message(self, message: Dict[str, Any], max_len: int = 10000) -> str: """ 发送消息到Stream Args: message: 消息内容 max_len: Stream最大长度,超出后旧消息会被删除 Returns: 消息ID """ message_id = self.redis.xadd( self.stream_name, message, maxlen=max_len, approximate=True ) return message_id.decode() if isinstance(message_id, bytes) else message_id def create_consumer_group(self, group_name: str, start_id: str = "0") -> bool: """ 创建消费者组 Args: group_name: 消费者组名称 start_id: 起始消息ID,"0"表示从头开始,"$"表示只接收新消息 """ try: self.redis.xgroup_create(self.stream_name, group_name, start_id) return True except redis.ResponseError as e: if "BUSYGROUP" in str(e): return True raise e def consume_messages( self, group_name: str, consumer_name: str, count: int = 10, block: int = 5000 ) -> List[Dict[str, Any]]: """ 从消费者组获取消息 Args: group_name: 消费者组名称 consumer_name: 消费者名称 count: 一次获取的消息数量 block: 阻塞等待时间(毫秒) Returns: 消息列表,包含消息ID和内容 """ messages = self.redis.xreadgroup( group_name, consumer_name, {self.stream_name: ">"}, count=count, block=block ) result = [] if messages: for stream_name, stream_messages in messages: for message_id, message_data in stream_messages: result.append({ 'id': message_id.decode() if isinstance(message_id, bytes) else message_id, 'data': {k.decode() if isinstance(k, bytes) else k: v.decode() if isinstance(v, bytes) else v for k, v in message_data.items()} }) return result def ack_message(self, group_name: str, message_id: str) -> int: """ 确认消息已处理 Args: group_name: 消费者组名称 message_id: 消息ID Returns: 成功确认的消息数量 """ return self.redis.xack(self.stream_name, group_name, message_id) def get_pending_messages( self, group_name: str, consumer_name: str = None, count: int = 100 ) -> List[Dict[str, Any]]: """ 获取待处理的消息 """ if consumer_name: pending = self.redis.xpending_range( self.stream_name, group_name, "-", "+", count, consumername=consumer_name ) else: pending = self.redis.xpending_range( self.stream_name, group_name, "-", "+", count ) return [ { 'id': item[0].decode() if isinstance(item[0], bytes) else item[0], 'consumer': item[1].decode() if isinstance(item[1], bytes) else item[1], 'idle_time': item[2], 'delivery_count': item[3] } for item in pending ] def get_stream_info(self) -> Dict[str, Any]: """获取Stream信息""" info = self.redis.xinfo_stream(self.stream_name) return { 'length': info[b'length'] if b'length' in info else info.get('length', 0), 'radix_tree_keys': info[b'radix-tree-keys'] if b'radix-tree-keys' in info else info.get('radix-tree-keys', 0), 'groups': info[b'groups'] if b'groups' in info else info.get('groups', 0), 'last_generated_id': info[b'last-generated-id'] if b'last-generated-id' in info else info.get('last-generated-id', ''), 'first_entry': info[b'first-entry'] if b'first-entry' in info else info.get('first-entry'), 'last_entry': info[b'last-entry'] if b'last-entry' in info else info.get('last-entry') }
if __name__ == "__main__": redis_client = redis.Redis(host='localhost', port=6379, db=0) stream_queue = RedisStreamQueue(redis_client, "task_stream") stream_queue.create_consumer_group("task_processors", "$") task = { "type": "email", "recipient": "user@example.com", "subject": "欢迎邮件", "content": "欢迎注册我们的服务!", "priority": "high" } message_id = stream_queue.send_message(task) print(f"发送消息:{message_id}") messages = stream_queue.consume_messages("task_processors", "worker1", count=1) for msg in messages: print(f"处理消息:{msg}") stream_queue.ack_message("task_processors", msg['id'])
|