返回首页 Redis 源码日志

源码日志

Redis 服务框架

Redis 基础数据结构

Redis 内功心法

Redis 应用

其他

Redis 监视器

Redis 的监视机制允许某一个客户端监视 Redis 服务器的行为,这种服务对于测试来说比较有帮助。

监视机制通过 monitor 这个命令来实现。来看看它的实现:Redis 在这里只是简单讲这个客户端加到一个 redis.monitors 链表中,接着就回复 ok 给客户端。

void monitorCommand(redisClient *c) {
    /* ignore MONITOR if already slave or in monitor mode */
    if (c->flags & REDIS_SLAVE) return;
        c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
        listAddNodeTail(server.monitors,c);
        addReply(c,shared.ok);
}

这里可以想象,当这个 Redis 服务器处理其他命令的时候,会向这个链表中的所有客户端发送通知。我们找到执行命令的核心函数 call(),可以发现确实是这么做的:

// call() 函数是执行命令的核心函数,真正执行命令的地方
/* Call() is the core of Redis execution of a command */
void call(redisClient *c, int flags) {
    long long dirty, start = ustime(), duration;
    int client_old_flags = c->flags;
    /* Sent the command to clients in MONITOR mode, only if the commands are
    * not generated from reading an AOF. */
    if (listLength(server.monitors) &&
        !server.loading &&
        !(c->cmd->flags & REDIS_CMD_SKIP_MONITOR))
        {
        replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
    }
    ......
}

replicationFeedMonitors() 的实现实际上就是将命令打包好,发送给每个监视器:

// 向监视器发送数据
void replicationFeedMonitors(redisClient *c, list *monitors, int dictid,
    robj **argv, int argc) {
    listNode *ln;
    listIter li;
    int j;
    sds cmdrepr = sdsnew("+");
    robj *cmdobj;
    char peerid[REDIS_PEER_ID_LEN];
    struct timeval tv;
    // 时间
    gettimeofday(&tv,NULL);
    cmdrepr = sdscatprintf(cmdrepr,"%ld.%06ld ",(long)tv.tv_sec,(long)tv.tv_usec);
    // 各种不同的客户端
    if (c->flags & REDIS_LUA_CLIENT) {
        cmdrepr = sdscatprintf(cmdrepr,"[%d lua] ",dictid);
    } else if (c->flags & REDIS_UNIX_SOCKET) {
        cmdrepr = sdscatprintf(cmdrepr,"[%d unix:%s] ",dictid,server.unixsocket);
    } else {
        getClientPeerId(c,peerid,sizeof(peerid));
        cmdrepr = sdscatprintf(cmdrepr,"[%d %s] ",dictid,peerid);
    }
    for (j = 0; j < argc; j++) {
        if (argv[j]->encoding == REDIS_ENCODING_INT) {
            cmdrepr = sdscatprintf(cmdrepr, "\"%ld\"", (long)argv[j]->ptr);
        } else {
            cmdrepr = sdscatrepr(cmdrepr,(char*)argv[j]->ptr,
            sdslen(argv[j]->ptr));
        }
        if (j != argc-1)
            cmdrepr = sdscatlen(cmdrepr," ",1);
        }
        cmdrepr = sdscatlen(cmdrepr,"\r\n",2);
        cmdobj = createObject(REDIS_STRING,cmdrepr);
        // 发送
        listRewind(monitors,&li);
    while((ln = listNext(&li))) {
        redisClient *monitor = ln->value;
        addReply(monitor,cmdobj);
    }
    decrRefCount(cmdobj);
}
上一篇: Redis 哨兵机制 下一篇: Redis 数据迁移