Redis 中的流是包含零个或任意多个流元素的有序队列,队列中的每个元素都包含一个 id 和任意多个键值对,这些元素会根据 id 的大小在流中有序地进行排列。
流中元素 id
基本命令
XADD
将一个带有指定 id 和键值对的元素追加到 stream 中,并返回插入的 id。
id 的限制:
同一个流中的不同元素是不允许使用相同ID的;
新元素的ID必须比流中所有已有元素的ID都要大;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| -- 将 id = 1100000000000-12345,k1 v1 的键值对添加到 s1 中 -- id 由毫秒事件 millisecond 和顺序编号 sequcen number 组成 XADD s1 1100000000000-12345 k1 v1
-- 1000000000000-0. 只包含毫秒时间,没有编号。将编号设置为 0 XADD s1 1000000000000 k1 v1
-- 使用 * 来自动生成元素 id XADD s1 * k2 v2
-- 使用先进先出的方式限制 s1 的长度为 2 XADD s1 MAXLEN 2 * k1 v1 XADD s1 MAXLEN 2 * k2 v2 -- 此时 s1 中保存 k2 和 k3 XADD s1 MAXLEN 2 * k3 v3
|
时间复杂度:O(logn)
XTRIM
将 streamn 修剪为最大长度,也是采用先进先出的方式,返回移除的个数。
时间复杂度:O(logn + m)
XDEL
根据 id 删除一个或多个元素,返回删除元素数量。
1 2 3
| XDEL s1 1683948051401-0
XDEL s1 1683948051401-1 1683948051401-2
|
时间复杂度:O(logn * m)
XLEN
返回 stream 中元素个数,不合法就返回 0.
1 2 3 4 5 6
| XADD s1 * k1 v1 XADD s1 * k2 v2 XADD s1 * k3 v3
-- 3. 返回 s1 中元素个数 XLEN s1
|
时间复杂度:O(1)
XRANGE
提供获取 stream 中元素的各种方式,如果不合法返回 nil。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| XADD s1 * k1 v1 XADD s1 * k2 v2 XADD s1 * k3 v3 XADD s1 * k4 v4 XADD s1 * k5 v5 XADD s1 * k6 v6 XADD s1 * k7 v7
-- 根据 id 获取指定元素 XRANGE s1 1683948592439-0 1683948592439-0 -- 获取 id 闭区间内的所有元素 XRANGE s1 1683948592251-0 1683948592773-0 -- 获取 stream 中所有元素 XRANGE s1 - + -- 对返回元素数量做限制,根据 id 升序返回 XRANGE s1 - + COUNT 2
|
时间复杂度:O(logn + m)
XREVRANGE
同 XRANGE,唯一不同的是 id 的逆序版本。
XREAD
提供获取 stream 中元素的各种方式,如果不合法返回 nil。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| XADD s1 * k1 v1 XADD s1 * k2 v2 XADD s1 * k3 v3 XADD s1 * k4 v4 XADD s1 * k5 v5 XADD s1 * k6 v6 XADD s1 * k7 v7
XADD s2 * k1 v1 XADD s2 * k2 v2 XADD s2 * k3 v3 XADD s2 * k4 v4 XADD s2 * k5 v5 XADD s2 * k6 v6 XADD s2 * k7 v7
-- 返回 s1 中 id > 1683948592043-0 的前 3 个元素 XREAD COUNT 3 STREAMS s1 1683948592043-0 -- 同时返回 s1 中 id > 1683948592043-0 的前 3 个元素,s2 中 id > 1683949330708-0 的前 3 个元素 XREAD COUNT 3 STREAMS s1 s2 1683948592043-0 1683949330708-0 -- 阻塞式读取 s2 中 id > 1683949809119-0 的前 2 个元素,时间单位 ms, 0 表示一直等待 XREAD BLOCK 0 COUNT 2 STREAMS s2 1683949809119-0
|
时间复杂度:O(logn + m)
XGROUP
管理消费者组,增删改。
id 用来限定消费者能够接收到消息范围。
1 2 3 4 5 6
| -- 在 s1 上创建一个名为 all-msg 的消费者组,可以接收 > 0-0 的消费者 时间复杂度:O(1) XGROUP CREATE s1 all-msg 0-0 -- 修改消费者组的 id 时间复杂度:O(1) XGROUP SETID s1 all-msg 10086 -- 删除消费者组 时间复杂度:O(n + m) XGROUP DESTROY s1 all-msg
|
总结
- stream 用来支持消息队列;
- stream 中包含零个到多个元素的有序队列;
- stream 的 id 由“毫秒时间”和“顺序编号“组成;
- 消费者组允许将一个 stream 从逻辑上划分为多个不同的 stream,让 group 所属的 comsumer 消费;
- 消息的生命周期:不存在 -> 未递送 -> 待处理 -> 已确认