Redis 监视器
Redis 的监视机制允许某一个客户端监视 Redis 服务器的行为,这种服务对于测试来说比较有帮助。
监视机制通过 monitor 这个命令来实现。来看看它的实现:Redis 在这里只是简单讲这个客户端加到一个 redis.monitors 链表中,接着就回复 ok 给客户端。
void monitorCommand(redisClient *c) {
/* ignore MONITOR if already slave or in monitor mode */
if (c->flags & REDIS_SLAVE) return;
c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
listAddNodeTail(server.monitors,c);
addReply(c,shared.ok);
}
这里可以想象,当这个 Redis 服务器处理其他命令的时候,会向这个链表中的所有客户端发送通知。我们找到执行命令的核心函数 call(),可以发现确实是这么做的:
// call() 函数是执行命令的核心函数,真正执行命令的地方
/* Call() is the core of Redis execution of a command */
void call(redisClient *c, int flags) {
long long dirty, start = ustime(), duration;
int client_old_flags = c->flags;
/* Sent the command to clients in MONITOR mode, only if the commands are
* not generated from reading an AOF. */
if (listLength(server.monitors) &&
!server.loading &&
!(c->cmd->flags & REDIS_CMD_SKIP_MONITOR))
{
replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
}
......
}
replicationFeedMonitors() 的实现实际上就是将命令打包好,发送给每个监视器:
// 向监视器发送数据
void replicationFeedMonitors(redisClient *c, list *monitors, int dictid,
robj **argv, int argc) {
listNode *ln;
listIter li;
int j;
sds cmdrepr = sdsnew("+");
robj *cmdobj;
char peerid[REDIS_PEER_ID_LEN];
struct timeval tv;
// 时间
gettimeofday(&tv,NULL);
cmdrepr = sdscatprintf(cmdrepr,"%ld.%06ld ",(long)tv.tv_sec,(long)tv.tv_usec);
// 各种不同的客户端
if (c->flags & REDIS_LUA_CLIENT) {
cmdrepr = sdscatprintf(cmdrepr,"[%d lua] ",dictid);
} else if (c->flags & REDIS_UNIX_SOCKET) {
cmdrepr = sdscatprintf(cmdrepr,"[%d unix:%s] ",dictid,server.unixsocket);
} else {
getClientPeerId(c,peerid,sizeof(peerid));
cmdrepr = sdscatprintf(cmdrepr,"[%d %s] ",dictid,peerid);
}
for (j = 0; j < argc; j++) {
if (argv[j]->encoding == REDIS_ENCODING_INT) {
cmdrepr = sdscatprintf(cmdrepr, "\"%ld\"", (long)argv[j]->ptr);
} else {
cmdrepr = sdscatrepr(cmdrepr,(char*)argv[j]->ptr,
sdslen(argv[j]->ptr));
}
if (j != argc-1)
cmdrepr = sdscatlen(cmdrepr," ",1);
}
cmdrepr = sdscatlen(cmdrepr,"\r\n",2);
cmdobj = createObject(REDIS_STRING,cmdrepr);
// 发送
listRewind(monitors,&li);
while((ln = listNext(&li))) {
redisClient *monitor = ln->value;
addReply(monitor,cmdobj);
}
decrRefCount(cmdobj);
}
上一篇: Redis 哨兵机制
下一篇: Redis 数据迁移