Redis 数据迁移
Redis 提供在线数据迁移的能力,把自身的数据往其他 Redis 服务器上迁移。如果需要将部分数据迁移到另一台Redis 服务器上,这个命令会非常有用。
redis migraiton 的实现比较简单。首先将需要迁移的命令打包好,发送到指定的 Redis 服务器上,回复ok 后则删除本地的键值对。
这里面用了前面讲到的 rio:读写对象既可以是文件也可以是内存,只需要安装相应的读写函数即可。这里不难理解。
网络传输部分,用到了 Redis 内部的 syncio 模块,syncio 即同步 io,每读/写入一部分数据会用 IO 多路复用的技术等待下一次可读写/的机会。在 migrateCommand() 的实现中,先用非阻塞的方式建立一个连接,接着将打包好的迁移数据发送到目标 Redis 服务器上,并等待目标 Redis 服务器的相应。
下面通过 migrateCommand() 来了解数据迁移是如何实现的:
/* MIGRATE host port key dbid timeout */
void migrateCommand(redisClient *c) {
int fd;
long timeout;
long dbid;
long long ttl = 0, expireat;
robj *o;
rio cmd, payload;
// 准备需要迁移的数据,这个数据可以由客户端来指定
......
// 建立一个非阻塞连接
/* Connect */
fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr,
atoi(c->argv[2]->ptr));
if (fd == -1) {
addReplyErrorFormat(c,"Can't connect to target node: %s",
server.neterr);
return;
}
// 等待建立成功
if ((aeWait(fd,AE_WRITABLE,timeout*1000) & AE_WRITABLE) == 0) {
addReplySds(c,sdsnew("-IOERR error or timeout connecting to the client\r\n"));
return;
}
// rio 的读写可以对应的是文件读写,也可以是内存的读写,只需要安装相应的读写
// 函数
// 初始化一块空的sds buffer
/* Create RESTORE payload and generate the protocol to call the command. */
rioInitWithBuffer(&cmd,sdsempty());
// 把需要迁移的数据打包追加到
......
// 可以指定过期时间
expireat = getExpire(c->db,c->argv[3]);
if (expireat != -1) {
ttl = expireat-mstime();
if (ttl < 1) ttl = 1;
}
// 生成restore 命令
......
// 生成包含 Redis 版本和校验字段的 payload
/* Finally the last argument that is the serailized object payload
* in the DUMP format. */
createDumpPayload(&payload,o);
// 写入到迁移内容中
redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,payload.io.buffer.ptr,
sdslen(payload.io.buffer.ptr)));
sdsfree(payload.io.buffer.ptr);
// 将迁移数据送往目标 Redis 服务器
/* Tranfer the query to the other node in 64K chunks. */
{
sds buf = cmd.io.buffer.ptr;
size_t pos = 0, towrite;
int nwritten = 0;
// 最多只传送64K
while ((towrite = sdslen(buf)-pos) > 0) {
towrite = (towrite > (64*1024) ? (64*1024) : towrite);
// 同步写
nwritten = syncWrite(fd,buf+pos,towrite,timeout);
if (nwritten != (signed)towrite) goto socket_wr_err;
pos += nwritten;
}
}
// 读取目标 Redis 服务器的回复
/* Read back the reply. */
{
char buf1[1024];
char buf2[1024];
/* Read the two replies */
if (syncReadLine(fd, buf1, sizeof(buf1), timeout) <= 0)
goto socket_rd_err;
if (syncReadLine(fd, buf2, sizeof(buf2), timeout) <= 0)
goto socket_rd_err;
if (buf1[0] == '-' || buf2[0] == '-') {
addReplyErrorFormat(c,"Target instance replied with error: %s",
(buf1[0] == '-') ? buf1+1 : buf2+1);
} else {
// 回复内容正常,说明数据已经迁移成功,删除原始 Redis 服务器的key-value
robj *aux;
dbDelete(c->db,c->argv[3]);
signalModifiedKey(c->db,c->argv[3]);
addReply(c,shared.ok);
server.dirty++;
// 将变更更新到从机,并写入AOF 文件
/* Translate MIGRATE as DEL for replication/AOF. */
aux = createStringObject("DEL",3);
rewriteClientCommandVector(c,2,aux,c->argv[3]);
decrRefCount(aux);
}
}
// 一些清理工作和错误处理
......
}
migrateCommand() 只是数据迁移的一部分代码,目标机器还要负责将数据存储到目标机器上,有兴趣可以参考 restoreCommand() 的实现,基本上和 migrateCommand() 是逆过来的。
上一篇: Redis 监视器
下一篇: Redis 集群(上)