redis stream的所有方法以及用处和使用场景

发布时间 2023-12-14 09:52:58作者: 岳宗柯


Redis Stream 是 Redis 提供的一种数据结构,用于支持实时消息流。它是一个有序、可持久化的消息日志,支持多个消费者组消费消息,同时保留消息的发布顺序。以下是 Redis Stream 的一些主要方法以及它们的用途和使用场景:

一、用途: 将消息添加到 Stream 中。

使用场景: 用于发布消息,创建新的消息流。

XADD key ID field1 value1 [field2 value2 ...]
# 添加消息到 Stream
def add_message(stream_name, message_id, message_data):
    redis_client.xadd(stream_name, {message_id: message_data})

二、用途: 按范围获取消息。

使用场景: 用于检索指定范围内的消息,支持 COUNT 选项以限制返回的消息数量。

XRANGE key start end [COUNT count]
# 获取指定范围内的消息
def get_messages_in_range(stream_name, start, end):
    messages = redis_client.xrange(stream_name, start, end)
    return messages

三、用途: 阻塞读取消息,支持多个 Stream。

使用场景: 用于消费消息,支持阻塞等待新消息的到来。

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
# 获取当前最新消息的 ID
latest_ids = r.xinfo_stream(stream_name)
latest_id = '0' if not latest_ids else latest_ids['last-generated-id']

# 使用 XREAD 命令读取消息
# BLOCK 0 表示非阻塞,COUNT 表示返回的消息数量
response = r.xread(streams={stream_name: latest_id}, count=10, block=0)

四、用途: 创建消费者组。

使用场景: 在消费消息之前,需要创建消费者组。

XGROUP CREATE key groupname id-or-$ [MKSTREAM]
# 创建消费者组
def create_consumer_group(stream_name, group_name, start_id):
    redis_client.xgroup_create(stream_name, group_name, id=start_id, mkstream=True)

五、用途: 阻塞读取消息并将其分配给消费者组中的消费者。

使用场景: 用于多个消费者协同消费消息,支持阻塞等待新消息的到来。

XREADGROUP GROUP groupname consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
# 阻塞读取消息
def read_messages_blocking(stream_name, consumer_name, count=1, block=0):
    messages = redis_client.xreadgroup(groupname=consumer_name, consumername='my_consumer', streams={stream_name: '>'}, count=count, block=block)
    return messages

六、用途: 确认消息已被消费。

使用场景: 消费者在成功处理消息后,通过此命令通知 Stream 已经处理完消息。

XACK key group ID [ID ...]
# 确认消息已被消费
def ack_message(stream_name, group_name, message_id):
    redis_client.xack(stream_name, group_name, message_id)

七、用途: 获取待处理的消息列表。

使用场景: 用于获取尚未被确认的消息列表,可以用于监控消费者的处理情况。

XPENDING key group [start end COUNT count] [consumer]
# 获取待处理的消息列表
def get_pending_messages(stream_name, group_name, start='-inf', end='+', count=10):
    pending_messages = redis_client.xpending_range(stream_name, group_name, start, end, count)
    return pending_messages

八、用途: 删除消息。

使用场景: 用于清理 Stream 中的消息,保持消息数量在一个可控制的范围内。

XTRIM key MINID ~
# 删除消息
def delete_messages(stream_name, minid):
    redis_client.xtrim(stream_name, minid=minid)

上述是Redis Stream 中的一些基本方法,可以用于发布、订阅和处理实时消息。这些方法使得 Redis 可以用作可靠的消息队列和实时事件处理系统。

除了上述提到的主要方法,以下是 Redis Stream 支持的一些其他方法:

九、用途: 删除一个或多个消息。

使用场景: 用于删除不再需要的消息。

XDEL key ID [ID ...]
def xdel(redis, key, *message_ids):
    return redis.xtrim(key, *message_ids)

十、用途: 获取指定 Stream 的消息数量。

使用场景: 用于获取消息流的当前长度。

XLEN key
def xlen(redis, key):
    return redis.xlen(key)

十一、用途: 反向按范围获取消息。

使用场景: 与 XRANGE 类似,但按照相反的顺序返回消息。

XREVRANGE key end start [COUNT count]
def xrevrange(redis, key, end, start, count=None):
    return redis.xrevrange(key, end, start, count=count)

十二、用途: 非阻塞读取并分配消息给消费者组中的消费者。

使用场景: 与 XREADGROUP 类似,但是是非阻塞的。

XREADGROUP GROUP groupname consumer STREAMS key [key ...] COUNT count BLOCK milliseconds
def xreadgroup(redis, groupname, consumer, key, count=None, block=None):
    return redis.xreadgroup(groupname, consumer, {key: '>'}, count=count, block=block)

十三、用途: 将消费者组的 last delivered ID 设置为指定值。

使用场景: 用于重置消费者组的状态。

XGROUP SETID key groupname id-or-$
def xgroup_setid(redis, key, groupname, id_or_$):
    return redis.xgroup_create_mkstream(key, groupname, id_or_$)

十四、用途: 销毁消费者组。

使用场景: 在不再需要消费者组时使用。

XGROUP DESTROY key groupname
def xgroup_destroy(redis, key, groupname):
    return redis.xgroup_destroy(key, groupname)

这些方法提供了对 Redis Stream 的更多控制和操作能力,您可以根据具体的需求选择适当的方法。它们共同构成了一个强大的消息处理系统,适用于实时数据流和事件处理场景。

十五、分别XREAD和XREADGROUP

XREAD 是用于直接读取消息的命令,不涉及消费者组。你可以通过指定多个 STREAM 名称来一次性读取多个流中的消息。

XREADGROUP 则是用于消费者组的命令。它允许多个消费者组并发地读取消息,并且支持消息的消费者组内分发。