Redis链表支持前后插入以及前后取出,所以如果往尾部插入元素,往头部取出元素,这就是一种消息队列,也可以说是消费者/生产者模型。可以利用lpush和rpop来实现。但是有一个问题,如果链表中没有数据,那么消费者将要在while循环中调用rpop,这样以来就浪费cpu资源,好在Redis提供一种阻塞版pop命令brpop或者blpop,用法为brpop/blpop list timeout
, 当链表为空的时候,brpop/blpop将阻塞,直到设置超时时间到或者list插入一个元素。
charles@charles-Aspire-4741:~/mydir/mylib/redis$ ./src/redis-cli> lpush list hello (integer) 1> brpop list 0 1) "list" 2) "hello"> brpop list 0 //阻塞在这里 /* ---------------------------------------------------- */ //当我在另一个客户端lpush一个元素之后,客户端输出为> brpop list 0 1) "list" 2) "world" (50.60s)//阻塞的时间
//阻塞状态 typedef struct blockingState { /* Generic fields. */ mstime_t timeout; /* 超时时间 */ /* REDIS_BLOCK_LIST */ dict *keys; /* The keys we are waiting to terminate a blocking * operation such as BLPOP. Otherwise NULL. */ robj *target; /* The key that should receive the element, * for BRPOPLPUSH. */ /* REDIS_BLOCK_WAIT */ int numreplicas; /* Number of replicas we are waiting for ACK. */ long long reploffset; /* Replication offset to reach. */ } blockingState; //继续列表 typedef struct readyList { redisDb *db;//就绪键所在的数据库 robj *key;//就绪键 } readyList; //客户端有关属性 typedef struct redisClient { int btype; /* Type of blocking op if REDIS_BLOCKED. */ blockingState bpop; /* blocking state */ } //服务器有关属性 struct redisServer { /* Blocked clients */ unsigned int bpop_blocked_clients; /* Number of clients blocked by lists */ list *unblocked_clients; /* list of clients to unblock before next loop */ list *ready_keys; /* List of readyList structures for BLPOP & co */ } //数据库有关属性 typedef struct redisDb { //keys->redisCLient映射 dict *blocking_keys; /* Keys with clients waiting for data (BLPOP) */ dict *ready_keys; /* Blocked keys that received a PUSH */ }redisDB
void brpopCommand(redisClient *c) { blockingPopGenericCommand(c,REDIS_TAIL); } //++++++++++++++++++++++++++++++++++++++++++++++++++ void blockingPopGenericCommand(redisClient *c, int where) { robj *o; mstime_t timeout; int j; if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout,UNIT_SECONDS) != REDIS_OK) return;//将超时时间保存在timeout中 for (j = 1; j < c->argc-1; j++) { o = lookupKeyWrite(c->db,c->argv[j]);//在数据库中查找操作的链表 if (o != NULL) {//如果不为空 if (o->type != REDIS_LIST) {//不是链表类型 addReply(c,shared.wrongtypeerr);//报错 return; } else { if (listTypeLength(o) != 0) {//链表不为空 /* Non empty list, this is like a non normal [LR]POP. */ char *event = (where == REDIS_HEAD) ? "lpop" : "rpop"; robj *value = listTypePop(o,where);//从链表中pop出一个元素 redisAssert(value != NULL); //给客户端发送pop出来的元素信息 addReplyMultiBulkLen(c,2); addReplyBulk(c,c->argv[j]); addReplyBulk(c,value); decrRefCount(value); notifyKeyspaceEvent(REDIS_NOTIFY_LIST,event, c->argv[j],c->db->id); if (listTypeLength(o) == 0) {//如果链表为空,从数据库删除链表 dbDelete(c->db,c->argv[j]); notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del", c->argv[j],c->db->id); } /* 省略一部分 */ } } } } /* 如果链表为空,则阻塞客户端 */ blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL); }
从源码可以看出,brpop可以操作多个链表变量,例如brpop list1 list2 0
void blockForKeys(redisClient *c, robj **keys, int numkeys, mstime_t timeout, robj *target) { dictEntry *de; list *l; int j; c->bpop.timeout = timeout;//超时时间赋值给客户端blockingState属性 c->bpop.target = target;//这属性适用于brpoplpush命令的输入对象,如果是brpop, //则target为空 if (target != NULL) incrRefCount(target);//不为空,增加引用计数 for (j = 0; j < numkeys; j++) { /* 将阻塞的key存入c.bpop.keys字典中 */ if (dictAdd(c->bpop.keys,keys[j],NULL) != DICT_OK) continue; incrRefCount(keys[j]); /* And in the other "side", to map keys -> clients */ //将阻塞的key和客户端添加进c->db->blocking_keys de = dictFind(c->db->blocking_keys,keys[j]); if (de == NULL) { int retval; /* For every key we take a list of clients blocked for it */ l = listCreate(); retval = dictAdd(c->db->blocking_keys,keys[j],l); incrRefCount(keys[j]); redisAssertWithInfo(c,keys[j],retval == DICT_OK); } else { l = dictGetVal(de); } listAddNodeTail(l,c);//添加到阻塞键的客户点链表中 } blockClient(c,REDIS_BLOCKED_LIST);//设置客户端阻塞标志 }
void blockClient(redisClient *c, int btype) { c->flags |= REDIS_BLOCKED;//设置标志 c->btype = btype;//阻塞操作类型 server.bpop_blocked_clients++; }
if (listLength(server.ready_keys)) handleClientsBlockedOnLists();
//db.c void dbAdd(redisDb *db, robj *key, robj *val) { sds copy = sdsdup(key->ptr); int retval = dictAdd(db->dict, copy, val);//将数据添加进数据库 redisAssertWithInfo(NULL,key,retval == REDIS_OK); //判断是否为链表类型,如果是,调用有链表已经ready函数 if (val->type == REDIS_LIST) signalListAsReady(db, key); if (server.cluster_enabled) slotToKeyAdd(key); } //t_list.c void signalListAsReady(redisDb *db, robj *key) { readyList *rl; /* 没有客户端阻塞在这个键上,则直接返回. */ if (dictFind(db->blocking_keys,key) == NULL) return; /* 这个键已近被唤醒了,所以没必要重新入队 */ if (dictFind(db->ready_keys,key) != NULL) return; /* Ok, 除了上述两情况,把这个键放入server.ready_keys */ rl = zmalloc(sizeof(*rl)); rl->key = key; rl->db = db; incrRefCount(key); listAddNodeTail(server.ready_keys,rl);//添加链表末尾 /* We also add the key in the db->ready_keys dictionary in order * to avoid adding it multiple times into a list with a simple O(1) * check. */ incrRefCount(key); //同时将这个阻塞键放入db->ready_keys redisAssert(dictAdd(db->ready_keys,key,NULL) == DICT_OK); }
void handleClientsBlockedOnLists(void) { while(listLength(server.ready_keys) != 0) { list *l; /* 将server.ready_keys赋给一个新的list,再将server.ready_keys清空 */ l = server.ready_keys; server.ready_keys = listCreate(); /* 迭代每一个就绪的每一个readyList */ while(listLength(l) != 0) { listNode *ln = listFirst(l);//获取第一个就绪readyList readyList *rl = ln->value; /* 从rl所属的数据库中删除rl */ dictDelete(rl->db->ready_keys,rl->key); /* 查询rl所属的数据库查找rl->key ,给阻塞客户端回复rl->key链表中的第一个元素*/ robj *o = lookupKeyWrite(rl->db,rl->key); if (o != NULL && o->type == REDIS_LIST) { dictEntry *de; /* 在rl->db->blocking_keys查找阻塞在rl->key的客户端链表 */ de = dictFind(rl->db->blocking_keys,rl->key); if (de) { list *clients = dictGetVal(de);//转换为客户端链表 int numclients = listLength(clients); while(numclients--) {//给每个客户端发送消息 listNode *clientnode = listFirst(clients); redisClient *receiver = clientnode->value;//阻塞的客户端 robj *dstkey = receiver->bpop.target;//brpoplpush命令目的链表 int where = (receiver->lastcmd && receiver->lastcmd->proc == blpopCommand) ? REDIS_HEAD : REDIS_TAIL;//获取取出的方向 robj *value = listTypePop(o,where);//取出就绪链表的元素 if (value) { /* Protect receiver->bpop.target, that will be * freed by the next unblockClient() * call. */ if (dstkey) incrRefCount(dstkey); unblockClient(receiver);//设置客户端为非阻塞状态 if (serveClientBlockedOnList(receiver, rl->key,dstkey,rl->db,value, where) == REDIS_ERR) { /* If we failed serving the client we need * to also undo the POP operation. */ listTypePush(o,value,where); }//给客户端回复链表中的元素内容 if (dstkey) decrRefCount(dstkey); decrRefCount(value); } else { break; } } } //如果链表为空,则从数据库中删除 if (listTypeLength(o) == 0) dbDelete(rl->db,rl->key); /* We don't call signalModifiedKey() as it was already called * when an element was pushed on the list. */ } /* 回收rl */ decrRefCount(rl->key); zfree(rl); listDelNode(l,ln); } listRelease(l); /* We have the new list on place at this point. */ } }
int serveClientBlockedOnList(redisClient *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where) { robj *argv[3]; if (dstkey == NULL) { /* Propagate the [LR]POP operation. */ argv[0] = (where == REDIS_HEAD) ? shared.lpop : shared.rpop; argv[1] = key; propagate((where == REDIS_HEAD) ? server.lpopCommand : server.rpopCommand, db->id,argv,2,REDIS_PROPAGATE_AOF|REDIS_PROPAGATE_REPL); /* BRPOP/BLPOP */ addReplyMultiBulkLen(receiver,2); addReplyBulk(receiver,key); addReplyBulk(receiver,value); } else { /* BRPOPLPUSH */ /* 省略 */ } }
propagate函数主要是将命令信息发送给aof和slave。函数中省略部分是brpoplpush list list1 0
int clientsCronHandleTimeout(redisClient *c, mstime_t now_ms) { time_t now = now_ms/1000; //.......... else if (c->flags & REDIS_BLOCKED) { /* Blocked OPS timeout is handled with milliseconds resolution. * However note that the actual resolution is limited by * server.hz. */ if (c->bpop.timeout != 0 && c->bpop.timeout < now_ms) { /* Handle blocking operation specific timeout. */ replyToBlockedClientTimedOut(c); unblockClient(c); } } //.............