sorted set在没有重复元素的集合基础上,每个元素多了一个分数score属性作为权重,查询时可以用score排序。
Sorted Set类型的常用操作
ZADD 添加元素
127.0.0.1:6379> ZADD rank 98 u1 (integer) 1 127.0.0.1:6379> ZADD rank 95 u2 (integer) 1 127.0.0.1:6379> ZADD rank 60 u3 (integer) 1 127.0.0.1:6379> ZADD rank 76 u4 (integer) 1 127.0.0.1:6379> ZADD rank 77 u5 (integer) 1
ZCARD 获取集合成员数量
127.0.0.1:6379> ZCARD rank (integer) 5
ZCOUNT 获取指定分数段内的成员数量
127.0.0.1:6379> ZCOUNT rank 80 100 (integer) 2
ZINCRBY 给集合中指定成员增加指定分数
127.0.0.1:6379> ZINCRBY rank 2 u3 "62"
ZRANGE/ZREVRANGE 返回按索引排序的成员
127.0.0.1:6379> ZRANGE rank 0 1 # 分数从小到大 1) "u3" 2) "u4" 127.0.0.1:6379> ZREVRANGE rank 0 2 # 分数从大到下 1) "u1" 2) "u2" 3) "u5"
ZRANGEBYSCORE/ZREVRANGEBYSCORE 返回按指定分数段内的元素列表
127.0.0.1:6379> ZRANGEBYSCORE rank 80 100 # 分数段从小到大 1) "u2" 2) "u1" 127.0.0.1:6379> ZREVRANGEBYSCORE rank 100 80 # 分数段从大到小 1) "u1" 2) "u2"
ZREM 删除指定元素
127.0.0.1:6379> ZREM rank u2 (integer) 1
实际应用场景
1.排行榜
实现一个实时变化的积分排行榜,可以利用 ZREVRANGE 轻松展现最高排名,并实现分页查询
首先将成员名单录入
127.0.0.1:6379> ZADD rank 0 player1 0 player2 0 player3 0 player4 0 player5 0 player6 (integer) 6
当然也可以在合适的时机将新增的成员单个写入
然后开始实时计算增加的分数 使用ZINCRBY, 增加完后会直接返回当前成员的最终分数
127.0.0.1:6379> ZINCRBY rank 50 player1 "50" 127.0.0.1:6379> ZINCRBY rank 53 player3 "53" 127.0.0.1:6379> ZINCRBY rank 20 player5 "20" 127.0.0.1:6379> ZINCRBY rank 40 player6 "40" 127.0.0.1:6379> ZINCRBY rank 40 player5 "60" 127.0.0.1:6379> ZINCRBY rank -2 player6 # 负数也是可以的 相当于减分 "38"
也可以不用初始化成员分数 直接给一个不存在的的成员增加分数,默认会创建并增加分数,默认从0开始
127.0.0.1:6379> ZINCRBY rank 55 player7 "55"
最后获取排行榜上前3的成员,使用ZREVRANGE
127.0.0.1:6379> ZREVRANGE rank 0 2 1) "player5" 2) "player7" 3) "player3"
如果要带上分数,需要增加一个参数 WITHSCORES
127.0.0.1:6379> ZREVRANGE rank 0 2 WITHSCORES 1) "player5" 2) "60" 3) "player7" 4) "55" 5) "player3" 6) "53"
2.定时/延迟任务
定时/延迟任务,或是延迟队列类功能,最重要的是在指定时间后才开始处理相关任务。
以下是大致实现方式
任务生产端:
每个任务生成一个任务ID(要有唯一性),将具体的任务信息可以放在其他数据库里,当然也可以放在Redis里。然后将任务ID作为sorted set的成员, 需要执行任务时间点的时间戳作为score写入(如果是延迟任务就计算出具体的时间戳)
127.0.0.1:6379> ZADD tasks 1000 t1 # 假设当前时间戳从1000开始 (integer) 1 127.0.0.1:6379> ZADD tasks 1003 t2 # 3s后才能触发的任务 (integer) 1 127.0.0.1:6379> ZADD tasks 1004 t3 (integer) 1 127.0.0.1:6379> ZADD tasks 1004 t4 (integer) 1
这时sorted set里的任务顺序大概是这样(按分数从小达到)
t1 | t2 | t3 | t4 |
---|---|---|---|
1000 | 1003 | 1004 | 1004 |
任务消费端
启动一个进程循环获取任务, 获取分数的范围在当前时间戳之前(表示已经到期的任务)
127.0.0.1:6379> ZRANGEBYSCORE tasks -inf 1000 1) "t1"
程序代码大概可以这样写
# 伪代码 while true { nowTimestamp = time.now() # 获取当前时间戳 taskIds = redis.ZRANGEBYSCORE("tasks", "-inf", nowTimestamp) for taskId in TaskIds { # TODO 执行任务处理逻辑 } }
不过有个很明显的问题, 当时间戳到1003的时候,任务t1还是能够获取到
127.0.0.1:6379> ZRANGEBYSCORE tasks -inf 1003 1) "t1" 2) "t2"
这是因为ZRANGEBYSCORE只是查询出来,并没有删除元素,删除元素需要使用ZREM
完善一下代码
# 伪代码 while true { nowTimestamp = time.now() # 获取当前时间戳 taskIds = redis.ZRANGEBYSCORE("tasks", "-inf", nowTimestamp) for taskId in taskIds { # TODO 执行任务处理逻辑 # 正常处理了调用ZREM redis.ZREM("tasks", taskId) } }
接下来随着任务越来越多,上面的处理逻辑可能短时间内也处理不完,或者单个任务处理时间较长,就会导致已经到期的任务无法及时处理。
一般情况下,任务之间是没有依赖关系,这时可以考虑横向库容消费进程,并行处理任务。
不过直接将上面的逻辑复制在多个进程里,会有任务重复的问题。
t1 | t2 | t3 | t4 |
---|---|---|---|
1000 | 1003 | 1004 | 1004 |
进程1查询点 | |||
进程2查询点 |
就像上面一样,可能出现,两个进程同时获取到t1任务
有两种处理方式:
第一种 增加分布式锁,可以基于Redis的set类型数据
代码如下
# 伪代码 while true { nowTimestamp = time.now() # 获取当前时间戳 taskIds = redis.ZRANGEBYSCORE("tasks", "-inf", nowTimestamp) for taskId in taskIds { addSuccess = redis.SADD("tasks:lock", taskId) if addSuccess == 0 { # 写入不成功 说明有其他进程在处理该任务 continue } # TODO 执行任务处理逻辑 redis.ZREM("tasks", taskId) # 正常处理了调用ZREM 删除任务 redis.SREM("tasks:lock", taskId) # 释放锁, 当然如果是任务处理失败 也可以释放锁 以便可以重试任务 } }
如果某一段时间没有任务,会存在 ZRANGEBYSCORE 空转的情况,增加了不必要的网络调用
可以在获取不到任务的时候,等待一个任务时间最小间隔 通常是1s
# 伪代码 while true { nowTimestamp = time.now() # 获取当前时间戳 taskIds = redis.ZRANGEBYSCORE("tasks", "-inf", nowTimestamp) if len(taskIds) == 0 { sleep(1) continue } for taskId in taskIds { addSuccess = redis.SADD("tasks:lock", taskId) if addSuccess == 0 { # 写入不成功 说明有其他进程在处理该任务 continue } # TODO 执行任务处理逻辑 redis.ZREM("tasks", taskId) # 正常处理了调用ZREM 删除任务 redis.SREM("tasks:lock", taskId) # 释放锁, 当然如果是任务处理失败 也可以释放锁 以便可以重试任务 } }
第二种 任务实际处理逻辑继续下沉
当前程序不进行任务处理 只取到期的任务,任务处理交给其他任务处理进程专门处理,只向List里推送任务
# 伪代码 # 定时获取任务程序 保留一个 while true { nowTimestamp = time.now() # 获取当前时间戳 taskIds = redis.ZRANGEBYSCORE("tasks", "-inf", nowTimestamp) if len(taskIds) == 0 { sleep(1) continue } for taskId in taskIds { redis.LPUSH("tasks:list", taskId) # 下推到队列任务 redis.ZREM("tasks", taskId) # 删除指定元素 } } # 下层的任务处理程序 可以横向扩展运行多个进程 while true { taskId = redis.BRPOP("tasks:list", 10) # 还考虑服务器 闲置连接超时 断开的异常 if taskId is null { continue } # TODO 执行任务处理逻辑 # 任务如果处理失败需要重试 可以考虑重新ZADD到 tasks 里 }
如果生产消息的速度非常快,也是可以考虑把 “定时获取任务程序” 也横向扩展,不过需要增加分布式锁,避免重复消费,可参考 第一种 的处理方式;也可以将任务按业务类型分类,放在不同的sorted set 中分别处理,互不影响。