消息队列

Redis 中的流是包含零个或任意多个流元素的有序队列,队列中的每个元素都包含一个 id 和任意多个键值对,这些元素会根据 id 的大小在流中有序地进行排列。

流中元素 id

基本命令

XADD

将一个带有指定 id 和键值对的元素追加到 stream 中,并返回插入的 id。

id 的限制:

  1. 同一个流中的不同元素是不允许使用相同ID的;

  2. 新元素的ID必须比流中所有已有元素的ID都要大;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
-- 将 id = 1100000000000-12345,k1 v1 的键值对添加到 s1 中
-- id 由毫秒事件 millisecond 和顺序编号 sequcen number 组成
XADD s1 1100000000000-12345 k1 v1

-- 1000000000000-0. 只包含毫秒时间,没有编号。将编号设置为 0
XADD s1 1000000000000 k1 v1

-- 使用 * 来自动生成元素 id
XADD s1 * k2 v2

-- 使用先进先出的方式限制 s1 的长度为 2
XADD s1 MAXLEN 2 * k1 v1
XADD s1 MAXLEN 2 * k2 v2
-- 此时 s1 中保存 k2 和 k3
XADD s1 MAXLEN 2 * k3 v3

时间复杂度:O(logn)

XTRIM

将 streamn 修剪为最大长度,也是采用先进先出的方式,返回移除的个数。

1
2
-- 1
XTRIM s1 MAXLEN 1

时间复杂度:O(logn + m)

XDEL

根据 id 删除一个或多个元素,返回删除元素数量。

1
2
3
XDEL s1 1683948051401-0

XDEL s1 1683948051401-1 1683948051401-2

时间复杂度:O(logn * m)

XLEN

返回 stream 中元素个数,不合法就返回 0.

1
2
3
4
5
6
XADD s1 * k1 v1
XADD s1 * k2 v2
XADD s1 * k3 v3

-- 3. 返回 s1 中元素个数
XLEN s1

时间复杂度:O(1)

XRANGE

提供获取 stream 中元素的各种方式,如果不合法返回 nil。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
XADD s1 * k1 v1
XADD s1 * k2 v2
XADD s1 * k3 v3
XADD s1 * k4 v4
XADD s1 * k5 v5
XADD s1 * k6 v6
XADD s1 * k7 v7

-- 根据 id 获取指定元素
XRANGE s1 1683948592439-0 1683948592439-0
-- 获取 id 闭区间内的所有元素
XRANGE s1 1683948592251-0 1683948592773-0
-- 获取 stream 中所有元素
XRANGE s1 - +
-- 对返回元素数量做限制,根据 id 升序返回
XRANGE s1 - + COUNT 2

时间复杂度:O(logn + m)

XREVRANGE

同 XRANGE,唯一不同的是 id 的逆序版本。

XREAD

提供获取 stream 中元素的各种方式,如果不合法返回 nil。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
XADD s1 * k1 v1
XADD s1 * k2 v2
XADD s1 * k3 v3
XADD s1 * k4 v4
XADD s1 * k5 v5
XADD s1 * k6 v6
XADD s1 * k7 v7

XADD s2 * k1 v1
XADD s2 * k2 v2
XADD s2 * k3 v3
XADD s2 * k4 v4
XADD s2 * k5 v5
XADD s2 * k6 v6
XADD s2 * k7 v7

-- 返回 s1 中 id > 1683948592043-0 的前 3 个元素
XREAD COUNT 3 STREAMS s1 1683948592043-0
-- 同时返回 s1 中 id > 1683948592043-0 的前 3 个元素,s2 中 id > 1683949330708-0 的前 3 个元素
XREAD COUNT 3 STREAMS s1 s2 1683948592043-0 1683949330708-0
-- 阻塞式读取 s2 中 id > 1683949809119-0 的前 2 个元素,时间单位 ms, 0 表示一直等待
XREAD BLOCK 0 COUNT 2 STREAMS s2 1683949809119-0

时间复杂度:O(logn + m)

XGROUP

管理消费者组,增删改。

id 用来限定消费者能够接收到消息范围。

1
2
3
4
5
6
-- 在 s1 上创建一个名为 all-msg 的消费者组,可以接收 > 0-0 的消费者 时间复杂度:O(1)
XGROUP CREATE s1 all-msg 0-0
-- 修改消费者组的 id 时间复杂度:O(1)
XGROUP SETID s1 all-msg 10086
-- 删除消费者组 时间复杂度:O(n + m)
XGROUP DESTROY s1 all-msg

总结

  • stream 用来支持消息队列;
  • stream 中包含零个到多个元素的有序队列;
  • stream 的 id 由“毫秒时间”和“顺序编号“组成;
  • 消费者组允许将一个 stream 从逻辑上划分为多个不同的 stream,让 group 所属的 comsumer 消费;
  • 消息的生命周期:不存在 -> 未递送 -> 待处理 -> 已确认