Redis基础学习:Stream操作

发布时间 2023-11-02 16:46:59作者: 岳宗柯


又来一个不好理解的东西,Stream 类型,而且它是整个 Redis 中对于数据操作最复杂的一种类型。但话又说回来,其实这个东西吧,还是个队列,只不过又是一种换了形式的队列。并且呢,据说是受到很多 Kafka 的影响,我对于 Kafka 仅仅是搭过环境的水平,完全没法用它来进行比较,所以我们的重点还是以理解 Redis 中的 Stream 为主吧。

回顾下我们之前学习过的队列类的数据类型,有 List 可以做普通的队列、有 Sorted Set 可以做定时延时类的队列、有 PubSub 可以做广播类型的系统,那么现在这个 Stream 又是干嘛的呢?

它呀,和广播有点像,也是多个客户端消费者可以接收,然后呢?数据不会 POP ,也就是说数据还在,消费者可以根据需要指定消费哪些数据,可以把之前的消息再用别的业务处理客户端消费一次,或者仅仅只等待消费最新的。然后它还可以分组,如果某个分组的消费出问题了还可以转移给别的消费者去处理。

是不是感觉有点懵逼?别急,我也懵逼,但是作用已经很清晰了,这是一个可以作为高可用、可存储的队列系统,相对于我们上回讲的 Pub/Sub 的问题,它是真的一个完整的多消费者队列功能实现,真有点正式的 消息队列 工具的意思。

1.基本操作

基本操作无外乎就是添加、修改、删除之类的了,我们一个一个来看下。

XADD

XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID field value [field value ...]

添加 Stream 数据使用的是 XADD 命令,它的参数很多,我们就学习最简单的步骤就好了。

127.0.0.1:6379> XADD a * name boy age 20
"1652342833001-0"
127.0.0.1:6379> XADD a * name girl age 17
"1652342848364-0"

中间那个 * 是啥意思?注意看命令的签名,*|ID ,意思是我们可以为数据指定一个 ID ,如果使用 * 的话就是自动生成一个 时间戳-序号 的 ID ,如果要自己手动指定的话,也必须是有个 - 的形式,比如 0-1、1111-1212 这样的格式。ID 可以用于保证消息的有序、不重复。

XRANGE

添加的内容是键值对形式的数据,和 Hash 一样,然后返回的内容就是新添加数据的 ID 。接下来就是数据的查询,XRANGE 命令就可以了,注意它的参数是 ID ,Stream 类型要求 ID 必须是递增有序的,所以可以使用 ID 来进行范围查询。

127.0.0.1:6379> XRANGE a - +
1) 1) "1652342833001-0"
   2) 1) "name"
      2) "boy"
      3) "age"
      4) "20"
2) 1) "1652342848364-0"
   2) 1) "name"
      2) "girl"
      3) "age"
      4) "17"
127.0.0.1:6379> XRANGE a 1652342833001-0 +
1) 1) "1652342833001-0"
   2) 1) "name"
      2) "boy"
      3) "age"
      4) "20"
2) 1) "1652342848364-0"
   2) 1) "name"
      2) "girl"
      3) "age"
      4) "17"
127.0.0.1:6379> XRANGE a 1652342833002 +
1) 1) "1652342848364-0"
   2) 1) "name"
      2) "girl"
      3) "age"
      4) "17"

ps:+ 和 - 号代表的是最大值和最小值的意思,也就是全部查找。默认的 XRANGE 是正序查找,也就是根据 ID 从小到大,所以要用 - 号在前的写法,它的反向查找命令则是从大到小的查找,我们后面会看到。

XDEL

对于删除来说,使用的是 XDEL 命令。

127.0.0.1:6379> XDEL a 1652342833001-0
(integer) 1
127.0.0.1:6379> XRANGE a - +
1) 1) "1652342848364-0"
   2) 1) "name"
      2) "girl"
      3) "age"
      4) "17"
127.0.0.1:6379> XDEL a 1652342848364
(integer) 1
127.0.0.1:6379> ttl a
(integer) -1

在这里,我们最后还用 ttl 试了一下 a 这个 key ,你会发现它竟然还存在。默认情况下,其它的数据类型比如 Hash 或者 List 之类的,如果数据删光了,那么这个 key 也会删掉,但是 Stream 不会,为什么呢?其实就是为了消费者监听的时候不至于出现问题嘛。这里是比较特别的一个地方,也是可以用在面试或者被面试情况下的一个小知识点哦。

普通的基本操作就是上面那些了,接下来我们试试相同 ID 以及小于当前最大 ID 的情况下能不能添加数据。

127.0.0.1:6379> XADD a 0-1 name wx age 4
"0-1"
127.0.0.1:6379> XADD a 0-1 name wx age 4
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item
127.0.0.1:6379> XADD a 0-2 name wx age 4
"0-2"
127.0.0.1:6379> XADD a 1-1 name wx age 4
"1-1"
127.0.0.1:6379> XADD a 1-5 name wx age 4
"1-5"
127.0.0.1:6379> XADD a 1-3 name wx age 4
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

很明显,相同的 ID 必然是无法添加的,然后我们测试的是直接将 ID 设置到 1-5 ,完了再添加 1-3 的数据,同样也是无法添加成功的。这个序列是前后都要大于,比如说我们如果换成 2-1 就可以插入了,- 前面的先比较,相同的再比较后面的序号。

XREVRANGE

最后,XREVRANGE 可以反向查询,也就是根据 ID 从大到小查询,另外 XRANGE 和 XREVRANGE 也都有 COUNT 参数,可以控制返回数据的数量。

127.0.0.1:6379> XREVRANGE a + -
1) 1) "1-5"
   2) 1) "name"
      2) "wx"
      3) "age"
      4) "4"
2) 1) "1-1"
   2) 1) "name"
      2) "wx"
      3) "age"
      4) "4"
3) 1) "0-2"
   2) 1) "name"
      2) "wx"
      3) "age"
      4) "4"
127.0.0.1:6379> XREVRANGE a 0 1
(empty array)
127.0.0.1:6379> XREVRANGE a 0-1 1-1
(empty array)
127.0.0.1:6379> XREVRANGE a 1-1 0-1
1) 1) "1-1"
   2) 1) "name"
      2) "wx"
      3) "age"
      4) "4"
2) 1) "0-2"
   2) 1) "name"
      2) "wx"
      3) "age"
      4) "4"
127.0.0.1:6379> XREVRANGE a + - count 1
1) 1) "1-5"
   2) 1) "name"
      2) "wx"
      3) "age"
      4) "4"

2.监听项目

要是仅仅是查询的话,那么 Stream 其实也就并没有什么特别的地方了。前面说过,它也是一种队列系统,那么如果我们要监听新来的消息要怎么办类?当然有相关的命令可以供我们使用了,第一个就来看看 XREAD 。

XREAD

其实他和 XRANGE 很像,不过是根据指定 ID 读取大于这个 ID 的最新数据。

127.0.0.1:6379> xread count 2 streams a 0
1) 1) "a"
   2) 1) 1) "0-2"
         2) 1) "name"
            2) "wx"
            3) "age"
            4) "4"
      2) 1) "1-1"
         2) 1) "name"
            2) "wx"
            3) "age"
            4) "4"

我们给定的是 0 ,就从 0-xxx 的数据开始,通过 COUNT 可以指定返回数据的条数。如果更换一下 ID 信息,就可以很明显地看到不同。

127.0.0.1:6379> xread streams a 1
1) 1) "a"
   2) 1) 1) "1-1"
         2) 1) "name"
            2) "wx"
            3) "age"
            4) "4"
      2) 1) "1-5"
         2) 1) "name"
            2) "wx"
            3) "age"
            4) "4"
127.0.0.1:6379> xread streams a 1-5
(nil)
127.0.0.1:6379> xread streams a 1-1
1) 1) "a"
   2) 1) 1) "1-5"
         2) 1) "name"
            2) "wx"
            3) "age"
            4) "4"

如果只是这样,未免就太没意思了。XREAD 是可以阻塞运行的,同时,还有一个特殊的 ID 值,可以让我们去获取最新添加进来的数据。

127.0.0.1:6379> xread block 0 streams a $
1) 1) "a"
   2) 1) 1) "1-6"
         2) 1) "name"
            2) "wx"
            3) "age"
            4) "4"
(12.95s)

BLOCK 参数,后面的 0 表示的是超时时间。添加了这个参数之后是不是就非常像 BLPOP 之类的命令了。然后 $ 这个 ID 符号表示的就是等待并获取最新添加进来的数据。注意,不会显示之前已经存在的老数据。

这一套下来有点像什么呢?其实非常类似于我们在 Linux 中使用的 tail -f 这个命令。

是不是看出不同了?List 取出数据是需要 POP ,之后 List 中这条数据就没有了。PubSub 是订阅一个频道,PUBLISH 往这个频道发送消息,如果之前你没有订阅,这个频道之前接收过的内容也是直接没有了。而 XREAD ,不仅可以获得最新的,老的也没有删掉,并且同时还可以阻塞进行。

3.消费者组

简单地理解 ,消费者组就是可以把多个客户端划分到同一个组中。然后,不同的组一起去消费这个 Stream 队列中的内容,和 Pub/Sub 的广播形式很像。不同的是,Stream 中可以让几个客户端划分到不同的组中,虽说还是处理数据吧,但是队列中的数据因为并没有消失,所以可以重复处理或者转交给其它消费者组处理,从而应对一些特殊情况,比如说某个消费者组出现问题,或者有大量堆积之类的。

要使用消费者组我们要先创建组。

127.0.0.1:6379> XGROUP create a agroup $
OK

命令很简单,XGROUP 命令,紧跟着 CREATE 参数,接着指定 Stream 数据的 key ,后面是组的名称,最后还有一个 ID ,我们可以直接使用上面学习过的 $ ,表示的就是接收最新 XADD进来的数据。然后我们就添加一些数据,注意,因为使用了 $ ,所以要在创建完组之后再添加数据,要不在组里面是查不到数据的。

127.0.0.1:6379> XADD a * msg red
"1652346780402-0"
127.0.0.1:6379> XADD a * msg blue
"1652346783165-0"
127.0.0.1:6379> XADD a * msg yellow

好了,我们可以通过 XREADGROUP 命令来获取组中的数据,它的参数签名是这样的。

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]

GROUP 后面要跟上组名并且起一个消费者名字。然后可以指定数量,也可以阻塞 BLOCK 。再之后是 STREAMS 后面需要跟着指定的 Stream 键名。就像下面这样使用。

127.0.0.1:6379> XREADGROUP group agroup c1 count 1 streams a >
1) 1) "a"
   2) 1) 1) "1652346780402-0"
         2) 1) "msg"
            2) "red"
127.0.0.1:6379> XREADGROUP group agroup c1 count 1 streams a 0
1) 1) "a"
   2) 1) 1) "1652346780402-0"
         2) 1) "msg"
            2) "red"

奇怪了,怎么又来了一个 > 符号?这个符号表示 目前为止从未传递给其他消费者 的消息内容。另外我们也可以指定一个 ID 或者给个 0 ,也能看到这条 red 数据。注意,我们一定要先通过 > 取出一条数据,然后再使用 0 才能读到这条数据。如果直接上来就是使用 0 的话,是查不到数据的,大家可以自己试试。

为什么要这样呢?其实 > 之后,数据是被读到了当前这个消费者组的pending entries list 中,它代表的是 待办实体列表 。我们在 c1 这个消费者中,会维护一个自己的这个列表,表示 c1 拿到了这个数据。但是,现在并不表示这个数据就被处理完成了。看不懂没关系,我们再向下看。

127.0.0.1:6379> XREADGROUP group agroup c1 count 1 streams a >
1) 1) "a"
   2) 1) 1) "1652346783165-0"
         2) 1) "msg"
            2) "blue"
127.0.0.1:6379> XREADGROUP group agroup c1 count 10 streams a 0
2) 1) "a"
   2) 1) 1) "1652346780402-0"
         2) 1) "msg"
            2) "red"
      2) 1) "1652346783165-0"
         2) 1) "msg"
            2) "blue"
127.0.0.1:6379> XACK a agroup 1652346780402-0
(integer) 1
127.0.0.1:6379> XREADGROUP group agroup c1 count 1 streams a 0
1) 1) "a"
   2) 1) 1) "1652346780402-0"
         2) 1) "msg"
            2) "red"

首先,我们再拿过来一条数据,现在 c1 里面有两条数据了。接着我们使用 XACK 去确认这条数据被处理完了。再继续看,pending entries list 中就没有 red 这条命令了。然后我们继续确认消费掉 blue 这条数据,现在整个 pending entries list 就空了。ACK 应答机制也是正式的消息队列中都有的功能。

127.0.0.1:6379> XACK a agroup 1652346783165-0
(integer) 1
127.0.0.1:6379> XREADGROUP group agroup c1 count 1 streams a 0
1) 1) "a"
   2) (empty array)

我们可以继续再读取新的数据进来。

127.0.0.1:6379> XREADGROUP group agroup c1 count 1 streams a >
1) 1) "a"
   2) 1) 1) "1652346783165-0"
         2) 1) "msg"
            2) "yellow"
127.0.0.1:6379> XREADGROUP group agroup c1 count 1 streams a 0
1) 1) "a"
   2) 1) 1) "1652346785825-0"
         2) 1) "msg"
            2) "yellow"

有意思吧?为什么要有这样一个机制呢?如果是普通队列,假设在处理过程中出现异常了,其实这条消息就被 POP 掉了,如果要恢复再次处理的话我们就要把它重新加入到队列中。而 Stream 则是通过确认机制,在我们确定消费完成之后,进行确认操作,之后这条数据才算是彻底被消费掉了。同时,需要注意的是,这只是在这一个组中处理的。假如当前我们的消费者 c1 出现了问题,那么我们可以把这条信息再转移给另一个消费者 c2 ,正好 c2 是个日志记录或者其它相关处理的消费者程序,这时就可以进行记录错误信息或者重新入队的操作了,甚至就把 c2 当成是一个“死信队列”(参考 RabbitMQ)。

4.转移

先再添加两条数据。

127.0.0.1:6379> XADD a * msg pink
"1652347643861-0"
127.0.0.1:6379> XADD a * msg white
"1652347649468-0"

然后我们再拿过来一条数据放到 c1 中,查看当前已经有两条数据。

127.0.0.1:6379> XREADGROUP group agroup c1 count 1 streams a >
1) 1) "a"
   2) 1) 1) "1652347643861-0"
         2) 1) "msg"
            2) "pink"
127.0.0.1:6379> XREADGROUP group agroup c1 count 2 streams a 0
1) 1) "a"
   2) 1) 1) "1652346785825-0"
         2) 1) "msg"
            2) "yellow"
      2) 1) "1652347643861-0"
         2) 1) "msg"
            2) "pink"

然后我们再通过 XREADGROUP 来指定一个 c2 消费者读取当前组中的数据。

127.0.0.1:6379> XREADGROUP group agroup c2 count 1 streams a >
1) 1) "a"
   2) 1) 1) "1652347649468-0"
         2) 1) "msg"
            2) "white"

好了,接下来我们使用 XPENDING 来查看当前组中的 pending entries list 情况。一共有三条 pending 数据,最小的是 1652346785825-0 ,最大的是 1652347649468-0 ,有两条 c1 在处理,有一条 c2 在处理。之前的 red 和 blue 我们已经 XACK 掉了,所以这里就只有这三条了。

127.0.0.1:6379> XPENDING a agroup
1) (integer) 3
2) "1652346785825-0"
3) "1652347649468-0"
4) 1) 1) "c1"
      2) "2"
   2) 1) "c2"
      2) "1"

同时,我们也可以指定消费者及使用范围参数进行查询。

127.0.0.1:6379> XPENDING a agroup - + 10 c2
1) 1) "1652347649468-0"
   2) "c2"
   3) (integer) 34869
   4) (integer) 4

好了,现在情况就是这么个情况,c1 中有两条未确认的信息,c2 中有一条,现在 c1 处理不了了,我想把其实中一条转移到 c2 来处理。非常简单,使用 XCLAIM 命令就可以。

127.0.0.1:6379> XCLAIM a agroup c2 0 1652346785825-0
1) 1) "1652346785825-0"
   2) 1) "msg"
      2) "yellow"

然后再使用 XPENDING 看一下,怎么样,c1 变成一条,c2 变成 2 条了吧。

127.0.0.1:6379> XPENDING a agroup
1) (integer) 3
2) "1652346785825-0"
3) "1652347649468-0"
4) 1) 1) "c1"
      2) "1"
   2) 1) "c2"
      2) "2"

您要还不信,咱们直接 XREADGROUP 查询一下 c2 ,是不是 yellow 这条数据已经过来了呀。

127.0.0.1:6379> XREADGROUP group agroup c2 count 2 streams a 0
1) 1) "a"
   2) 1) 1) "1652346785825-0"
         2) 1) "msg"
            2) "yellow"
      2) 1) "1652347649468-0"
         2) 1) "msg"
            2) "white"

总结一下,在组中,只要是没有 XACK 的数据,其实都是还可以再次进行消费的数据,你可以直接查询出来那么就是可以再消费的数据。而真正被消费掉的数据只要确认一下就可以了。

那么如果是不同的组呢?比如我们再建立一个消费者组,然后还是从 a 中读取数据。这个时候,两个组会同时消费一样的数据。注意了,划重点,只有一个组中不同的消费者会拿到不同的数据,而不是不同的组,不同的组是能够消费相同的数据的。大家可以自己尝试一下哦。

5.其它

好了,核心内容说完了,说实话,还是比较懵逼的。不过大概的概念大家应该能看得明白了吧,实在不行就当成是一个比较高级的队列,再不行等咱学完了其它正式的消息队列系统之后再回来研究一下呗。希望到时候别直接把我上面的理解全都颠覆了,直接就说这 TM 写得啥玩意,压根不对!!

学习嘛,就是这样,对错也只是一时的,现在能理解到什么程度就是什么程度,将来发现现在的理解有问题反而是好事,也能更加加深对于这个知识点的印象。

剩下的就是一些辅助命令了。首先就是 XINFO ,一看名字就是查看 Stream 类型的一些信息用的。

127.0.0.1:6379> XINFO stream a
 1) "length"
 2) (integer) 5
 3) "radix-tree-keys"
 4) (integer) 1
 5) "radix-tree-nodes"
 6) (integer) 2
 7) "last-generated-id"
 8) "1652347649468-0"
 9) "groups"
10) (integer) 1
11) "first-entry"
12) 1) "1652346760526-0"
    2) 1) "msg"
       2) "begin"
13) "last-entry"
14) 1) "1652347649468-0"
    2) 1) "msg"
       2) "white"
127.0.0.1:6379> XINFO groups a
1) 1) "name"
   2) "agroup"
   3) "consumers"
   4) (integer) 2
   5) "pending"
   6) (integer) 3
   7) "last-delivered-id"
   8) "1652347649468-0"
127.0.0.1:6379> XINFO consumers a agroup
1) 1) "name"
   2) "c1"
   3) "pending"
   4) (integer) 1
   5) "idle"
   6) (integer) 544241
2) 1) "name"
   2) "c2"
   3) "pending"
   4) (integer) 2
   5) "idle"
   6) (integer) 421324

它可以查看 Stream 信息,也可以查看指定的组以及组内的消费者信息,同时用它也可以看到 pending 中的信息,感觉不错吧。然后是 XLEN ,比较简单,就是 Steam 中的数据数量。

127.0.0.1:6379> XLEN a
(integer) 5

看到没有,上面我们不管是 XREAD 还是 XREADGROUP 操作了半天,本质上对 Stream 根本没啥影响,别人还是老老实实的 5 条数据。

127.0.0.1:6379> XGROUP destroy a agroup
(integer) 1
127.0.0.1:6379> XINFO consumers a agroup
(error) NOGROUP No such consumer group 'agroup' for key name 'a'

XGROUP 可以创建,使用的是 CREATE ,那么必须也可以删组嘛,使用的就是 XGROUP DESTROY 命令。关于 XGROUP 更多的命令选择,可以使用 XGROUP HELP 来查看。

6.裁剪

裁剪是啥?好吧,还是来简单的理解,就是我们这个 Stream 中只留下指定数量的数据,别的都被裁剪掉了。

127.0.0.1:6379> xtrim a maxlen 4
(integer) 2

使用的就是 XTRIM 命令,MAXLEN 就是留下的最大数量,然后裁剪掉的是最早的那些数据,比如我们这里就裁剪掉了最早的 red 和 blue 。

127.0.0.1:6379> xrange a - +
1) 1) "1652346785825-0"
   2) 1) "msg"
      2) "yellow"
1) 1) "1652347643861-0"
   2) 1) "msg"
      2) "pink"
2) 1) "1652347649468-0"
   2) 1) "msg"
      2) "white"

7.总结

好了,到此为止,整个 Redis 中和数据类型或数据结构相关的基础知识我们就学习完了。当然还有别的基本命令可能会和这些数据类型相关的内容有关,等我们学习到的时候再说。后面基础系列还将继续学习其它的相关命令,如果你也和我一样想要完全从头好好学一遍 Redis 的话,紧跟着大部队的步伐不要掉队哦。