Stream
稳定度: 2 - 稳定
流是一个被io.js
内部的许多对象所实现的抽象接口。例如一个发往 HTTP 服务器的请求是一个留,stdout
也是一个流。流可以是可读的,可写的或双向的。所有的流都是EventEmitter
实例。
你可以通过require('stream')
来取货Stream
的基类。其中包括了Readable
流,Writable
流,Duplex
流和Transform
流的基类。
此文档分为三个章节。第一章节解释了在你的编程中使用流时需要的 API。如果你不需要实现你自己的流式 API,你可以在这里停止。
第二章节解释了你在构建你自己的流时需要的 API,这些 API 是为了方便你这么做而设计的。
第三章节深入讲述了流的工作机制,包括一些内部的机制和函数,你不应该去改动它们除非你知道你在做什么。
面向流消费者的 API
流可以是可读的,可写的,或双工的。
所有的流都是EventEmitters
。但是它们也各自有一些独特的方法和属性,这取决于它们是可读流,可写流或双工流。
如果一个流同时是可读的和可写的,那么表示它实现了以下所有的方法和事件。所以,这些 API 同时也涵盖Duplex
或Transform
流,即使它们的实现可能有些不同。
在你程序中,为了消费流而去实现流接口不是必须的。如果你确实正在你的程序中实现流接口,请参考下一章节面向流实现者的API
。
几乎所有io.js
程序,不论多简单,都使用了流。下面是一个在io.js
是使用流的例子:
var http = require('http');
var server = http.createServer(function (req, res) {
// req is an http.IncomingMessage, which is a Readable Stream
// res is an http.ServerResponse, which is a Writable Stream
var body = '';
// we want to get the data as utf8 strings
// If you don't set an encoding, then you'll get Buffer objects
req.setEncoding('utf8');
// Readable streams emit 'data' events once a listener is added
req.on('data', function (chunk) {
body += chunk;
});
// the end event tells you that you have entire body
req.on('end', function () {
try {
var data = JSON.parse(body);
} catch (er) {
// uh oh! bad json!
res.statusCode = 400;
return res.end('error: ' + er.message);
}
// write back something interesting to the user:
res.write(typeof data);
res.end();
});
});
server.listen(1337);
// $ curl localhost:1337 -d '{}'
// object
// $ curl localhost:1337 -d '"foo"'
// string
// $ curl localhost:1337 -d 'not json'
// error: Unexpected token o
Class: stream.Readable
可读流接口是一个你可以从之读取数据的数据源的抽象。换句话说,数据从可读流而来。
除非你指示已经准备好接受数据,否则可读流不会开始发生数据。
可读流有两个“模式”:流动模式和暂停模式。当在流动模式时,数据由底层系统读出,并且会尽快地提供给你的程序。当在暂停模式时,你必须调用stream.read()
方法来获取数据块。流默认是暂停模式。
注意:如果data
事件没有被绑定监听器,并且没有导流(pipe)目标,并且流被切换到了流动模式,那么数据将会被丢失。
你可以通过下面任意一个做法切换到流动模式:
-
添加一个
data
事件的监听器来监听数据。 -
调用
resume()
方法来明确开启流动模式。 - 调用
pipe()
方法将数据导入一个可写流。
你可以同意下面任意一种方法切换回暂停模式:
-
如果没有导流(pipe)目标,调用
pause()
方法。 - 如果有导流(pipe)目标,移除所有的
data
事件监听器,并且通过unpipe()
方法移除所有导流目标。
注意,由于为了向后兼任的原因,移除data
事件的监听器将不会自动暂停流。同样的,如果有导流目标,调用pause()
方法将不会保证目标流排空并请求更多数据时保持暂停。
一些内置的可读流例子:
- 客户端的 HTTP 请求
- 服务端的 HTTP 响应
- 文件系统读取流
zlib
流crypto
流- tcp sockets
- 子进程的 stdout 和 stderr
process.stdin
Event: 'readable'
当一个数据块能可以从流中被读出时,会触发一个readable
事件。
某些情况下,监听一个readable
事件会导致一些将要被读出的数据从底层系统进入内部缓冲,如果它没有准备好。
var readable = getReadableStreamSomehow();
readable.on('readable', function() {
// there is some data to read now
});
当内部缓冲被排空时,一旦有更多数据,readable
事件会再次触发。
Event: 'data'
- chunk Buffer | String 数据块
为一个没有被暂停的流添加一个data
事件的监听器会使其切换到流动模式。之后数据会被尽快得传递给用户。
如果你只是想尽快得从流中取得所有数据,这是最好的方式。
var readable = getReadableStreamSomehow();
readable.on('data', function(chunk) {
console.log('got %d bytes of data', chunk.length);
});
Event: 'end'
当没有更多可读的数据时这个事件会被触发。
注意,除非数据被完全消费,end
事件才会触发。这可以通过切换到流动模式,或重复调用read()
方法。
var readable = getReadableStreamSomehow();
readable.on('data', function(chunk) {
console.log('got %d bytes of data', chunk.length);
});
readable.on('end', function() {
console.log('there will be no more data.');
});
Event: 'close'
当底层资源(如源头的文件描述符)被关闭时触发。不是所有的流都会触发这个事件。
Event: 'error'
- Error Object
当接受数据时有错误发生,会触发此事件。
readable.read([size])
- size Number 可选,指定读取数据的数量
- Return String | Buffer | null
read()
方法从内部缓冲中取出数据并返回它。如果没有可用数据,那么将返回null
。
如果你传递了一个size
参数,那么它将返回指定字节的数据。如果size
参数的字节数不可用,那么将返回null
。
如果你不指定size
参数,那么将会返回内部缓冲中的所有数据。
这个方法只能在暂定模式中被调用。在流动模式下,这个方法会被自动地重复调用,知道内部缓冲被排空。
var readable = getReadableStreamSomehow();
readable.on('readable', function() {
var chunk;
while (null !== (chunk = readable.read())) {
console.log('got %d bytes of data', chunk.length);
}
});
如果这个方法返回一个数据块,那么它也会触发data
事件。
readable.setEncoding(encoding)
- encoding String 使用的编码
- Return: this
调用这个函数会导致流返回指定编码的字符串而不是Buffer
对象。例如,如果你调用readable.setEncoding('utf8')
,那么输出的数据将被解释为 UTF-8 数据,并且作为字符串返回。如果你调用了readable.setEncoding('hex')
,那么数据将被使用十六进制字符串的格式编码。
该方法可以正确地处理多字节字符。如果你只是简单地直接取出缓冲并且对它们调用buf.toString(encoding)
,将会导致错位。如果你想使用字符串读取数据,请使用这个方法。
var readable = getReadableStreamSomehow();
readable.setEncoding('utf8');
readable.on('data', function(chunk) {
assert.equal(typeof chunk, 'string');
console.log('got %d characters of string data', chunk.length);
});
readable.resume()
- Return: this
这个方法将会让可读流继续触发data
事件。
这个方法将会使流切换至流动模式。如果你不想消费流中的数据,但你想监听它的end
事件,你可以通过调用readable.resume()
来打开数据流。
var readable = getReadableStreamSomehow();
readable.resume();
readable.on('end', function() {
console.log('got to the end, but did not read anything');
});
readable.pause()
- Return: this
这个方法会使一个处于流动模式的流停止触发data
事件,并切换至暂停模式。所有可用的数据将仍然存在于内部缓冲中。
var readable = getReadableStreamSomehow();
readable.on('data', function(chunk) {
console.log('got %d bytes of data', chunk.length);
readable.pause();
console.log('there will be no more data for 1 second');
setTimeout(function() {
console.log('now data will start flowing again');
readable.resume();
}, 1000);
});
readable.isPaused()
- Return: Boolean
这个方法会返回流是否被客户端代码所暂停(调用readable.pause()
,并且没有在之后调用readable.resume()
)。
var readable = new stream.Readable
readable.isPaused() // === false
readable.pause()
readable.isPaused() // === true
readable.resume()
readable.isPaused() // === false
readable.pipe(destination[, options])
- destination Writable Stream 写入数据的目标
- options Object
- end Boolean 当读取者结束时结束写入者。默认为
true
。
- end Boolean 当读取者结束时结束写入者。默认为
这个方法会取出可读流中所有的数据,并且将之写入指定的目标。这个方法会自动调节流量,所以当快速读取可读流时目标不会溢出。
可以将数据安全地导流至多个目标。
var readable = getReadableStreamSomehow();
var writable = fs.createWriteStream('file.txt');
// All the data from readable goes into 'file.txt'
readable.pipe(writable);
这个函数返回目标流,所以你可以链式调用pipe()
:
var r = fs.createReadStream('file.txt');
var z = zlib.createGzip();
var w = fs.createWriteStream('file.txt.gz');
r.pipe(z).pipe(w);
例子,模仿 UNIX 的cat
命令:
process.stdin.pipe(process.stdout);
默认情况下,当源流触发end
事件时,目标流会被调用end()
方法,然后目标就不再是可写的了。将传递{ end: false }
作为options
参数,将保持目标流开启。
例子,保持被写入的流开启,所以“Goodbye”可以在末端被写入:
reader.pipe(writer, { end: false });
reader.on('end', function() {
writer.end('Goodbye\n');
});
注意,不论指定任何options
参数,process.stderr
和process.stdout
在程序退出前永远不会被关闭。
readable.unpipe([destination])
- destination Writable Stream 可选,指定解除导流的流
这方法会移除之前调用pipe()
方法所设置的钩子。
如果没有指定目标,那么所有的导流都会被移除。
如果指定了目标,但是并没有为目标设置导流,那么什么都不会发生。
var readable = getReadableStreamSomehow();
var writable = fs.createWriteStream('file.txt');
// All the data from readable goes into 'file.txt',
// but only for the first second
readable.pipe(writable);
setTimeout(function() {
console.log('stop writing to file.txt');
readable.unpipe(writable);
console.log('manually close the file stream');
writable.end();
}, 1000);
readable.unshift(chunk)
- chunk Buffer | String 要插回读取队列开头的数据块。
该方法在许多场景中都很有用,比如一个流正在被一个解析器消费,解析器可能需要将某些刚拉取出的数据“逆消费”回来源,以便流能将它传递给其它消费者。
如果你发现你必须经常在你的程序中调用stream.unshift(chunk)
,你应该考虑实现一个Transform
流(参阅下文的面向流实现者的API)。
// Pull off a header delimited by \n\n
// use unshift() if we get too much
// Call the callback with (error, header, stream)
var StringDecoder = require('string_decoder').StringDecoder;
function parseHeader(stream, callback) {
stream.on('error', callback);
stream.on('readable', onReadable);
var decoder = new StringDecoder('utf8');
var header = '';
function onReadable() {
var chunk;
while (null !== (chunk = stream.read())) {
var str = decoder.write(chunk);
if (str.match(/\n\n/)) {
// found the header boundary
var split = str.split(/\n\n/);
header += split.shift();
var remaining = split.join('\n\n');
var buf = new Buffer(remaining, 'utf8');
if (buf.length)
stream.unshift(buf);
stream.removeListener('error', callback);
stream.removeListener('readable', onReadable);
// now the body of the message can be read from the stream.
callback(null, header, stream);
} else {
// still reading the header.
header += str;
}
}
}
}
readable.wrap(stream)
- stream Stream 一个“旧式”可读流
Node.js
v0.10 以及之前版本的流没有完全包含如今的所有的流 API(更多的信息请参阅下文的“兼容性”)。
如果你正在使用一个老旧的io.js
库,它触发data
时间并且有一个仅作查询用途的pause()
方法,那么你可以调用wrap()
方法来创建一个使用“旧式”流作为数据源的可读流。
你几乎不会用到这个函数,它的存在仅是为了老旧的io.js
程序和库交互。
例子:
var OldReader = require('./old-api-module.js').OldReader;
var oreader = new OldReader;
var Readable = require('stream').Readable;
var myReader = new Readable().wrap(oreader);
myReader.on('readable', function() {
myReader.read(); // etc.
});
Class: stream.Writable
可写流接口是一个你可以向其写入数据的目标的抽象。
一些内部的可写流例子:
- 客户端的 http 请求
- 服务端的 http 响应
- 文件系统写入流
zlib
流crypto
流- tcp
socket
- 子进程
stdin
process.stdout
,process.stderr
writable.write(chunk[, encoding][, callback])
- chunk String | Buffer 要写入的数据
- encoding String 编码,如果数据块是字符串
- callback Function 当数据块写入完毕后调用的回调函数
- Returns: Boolean 如果被全部处理则返回
true
该方法向底层系统写入数据,并且当数据被全部处理后调用指定的回调函数。
返回值指示了你是否可以立刻写入数据。如果数据需要被内部缓冲,会返回false
。否则返回true
。
返回值经供参考。即使返回false
,你仍可以继续写入数据。但是,写入的数据将会被缓冲在内存里,所以最好不要这样做。应该在写入更多数据前等待drain
事件。
Event: 'drain'
如果一个writable.write(chunk)
调用返回了false
,那么drain
事件会指示出可以继续向流写入数据的时机。
// Write the data to the supplied writable stream 1MM times.
// Be attentive to back-pressure.
function writeOneMillionTimes(writer, data, encoding, callback) {
var i = 1000000;
write();
function write() {
var ok = true;
do {
i -= 1;
if (i === 0) {
// last time!
writer.write(data, encoding, callback);
} else {
// see if we should continue, or wait
// don't pass the callback, because we're not done yet.
ok = writer.write(data, encoding);
}
} while (i > 0 && ok);
if (i > 0) {
// had to stop early!
// write some more once it drains
writer.once('drain', write);
}
}
}
writable.cork()
强制滞留所有写入。
滞留的数据会在调用.uncork()
或.end()
方法后被写入。
writable.uncork()
写入在调用.cork()
方法所有被滞留的数据。
writable.setDefaultEncoding(encoding)
- encoding String 新的默认编码
设置一个可写流的默认编码。
writable.end([chunk][, encoding][, callback])
- chunk String | Buffer 可选,写入的数据
- encoding String 编码,如果数据块是字符串
- callback Function 可选,回调函数
当没有更多可写的数据时,调用这个方法。如果指定了回调函数,那么会被添加为finish
事件的监听器。
在调用了end()
后调用write()
会导致一个错误。
// write 'hello, ' and then end with 'world!'
var file = fs.createWriteStream('example.txt');
file.write('hello, ');
file.end('world!');
// writing more now is not allowed!
Event: 'finish'
当调用了end()
方法,并且所有的数据都被写入了底层系统,这个事件会被触发。
var writer = getWritableStreamSomehow();
for (var i = 0; i < 100; i ++) {
writer.write('hello, #' + i + '!\n');
}
writer.end('this is the end\n');
writer.on('finish', function() {
console.error('all writes are now complete.');
});
Event: 'pipe'
- src Readable Stream 对这个可写流进行导流的源可读流
这个事件将会在可读流被一个可写流使用pipe()
方法进行导流时触发。
var writer = getWritableStreamSomehow();
var reader = getReadableStreamSomehow();
writer.on('pipe', function(src) {
console.error('something is piping into the writer');
assert.equal(src, reader);
});
reader.pipe(writer);
Event: 'unpipe'
- src Readable Stream 对这个可写流停止导流的源可读流
当可读流对其调用unpipe()
方法,在源可读流的目标集合中删除这个可写流,这个事件将会触发。
var writer = getWritableStreamSomehow();
var reader = getReadableStreamSomehow();
writer.on('unpipe', function(src) {
console.error('something has stopped piping into the writer');
assert.equal(src, reader);
});
reader.pipe(writer);
reader.unpipe(writer);
Event: 'error'
- Error object
在写入数据或导流发生错误时触发。
Class: stream.Duplex
双工是同时实现了可读流与可写流的借口。它的用处请参阅下文。
内部双工流的例子:
- tcp
socket
zlib
流crypto
流
Class: stream.Transform
转换流是一种输出由输入计算所得的栓共流。它们同时集成了可读流与可写流的借口。它们的用处请参阅下文。
内部转换流的例子:
zlib
流crypto
流
面向流实现者的 API
实现所有种类的流的模式都是一样的:
- 为你的子类继承合适的父类(
util.inherits
非常合适于做这个)。 - 为了保证内部机制被正确初始化,在你的构造函数中调用合适的父类构造函数。
- 实现一个或多个特定的方法,参阅下文。
被扩展的类和要实现的方法取决于你要编写的流类的类型:
用途 | 类 | 需要实现的方法 |
---|---|---|
只读 | Readable | _read |
只写 | Writable | _write, _writev |
可读以及可写 | Duplex | _read, _write, _writev |
操作被写入数据,然后读出结果 | Transform | _transform, _flush |
在你的实现代码中,非常重要的一点是永远不要调用上文的面向流消费者的 API。否则,你在程序中消费你的流接口时可能有潜在的副作用。
Class: stream.Readable
stream.Readable
是一个被设计为需要实现底层的_read(size)
方法的抽象类。
请参阅上文的面向流消费者的 API 来了解如何在程序中消费流。以下解释了如果在你的程序中实现可读流。
例子:一个计数流
这是一个可读流的基础例子。它从1到1,000,000递增数字,然后结束。
var Readable = require('stream').Readable;
var util = require('util');
util.inherits(Counter, Readable);
function Counter(opt) {
Readable.call(this, opt);
this._max = 1000000;
this._index = 1;
}
Counter.prototype._read = function() {
var i = this._index++;
if (i > this._max)
this.push(null);
else {
var str = '' + i;
var buf = new Buffer(str, 'ascii');
this.push(buf);
}
};
例子:简单协议 v1 (次优)
这类似于上文中提到的parseHeader
函数,但是使用一个自定义流实现。另外,注意这个实现不将流入的数据转换为字符串。
更好地实现是作为一个转换流实现,请参阅下文更好地实现。
// A parser for a simple data protocol.
// The "header" is a JSON object, followed by 2 \n characters, and
// then a message body.
//
// NOTE: This can be done more simply as a Transform stream!
// Using Readable directly for this is sub-optimal. See the
// alternative example below under the Transform section.
var Readable = require('stream').Readable;
var util = require('util');
util.inherits(SimpleProtocol, Readable);
function SimpleProtocol(source, options) {
if (!(this instanceof SimpleProtocol))
return new SimpleProtocol(source, options);
Readable.call(this, options);
this._inBody = false;
this._sawFirstCr = false;
// source is a readable stream, such as a socket or file
this._source = source;
var self = this;
source.on('end', function() {
self.push(null);
});
// give it a kick whenever the source is readable
// read(0) will not consume any bytes
source.on('readable', function() {
self.read(0);
});
this._rawHeader = [];
this.header = null;
}
SimpleProtocol.prototype._read = function(n) {
if (!this._inBody) {
var chunk = this._source.read();
// if the source doesn't have data, we don't have data yet.
if (chunk === null)
return this.push('');
// check if the chunk has a \n\n
var split = -1;
for (var i = 0; i < chunk.length; i++) {
if (chunk[i] === 10) { // '\n'
if (this._sawFirstCr) {
split = i;
break;
} else {
this._sawFirstCr = true;
}
} else {
this._sawFirstCr = false;
}
}
if (split === -1) {
// still waiting for the \n\n
// stash the chunk, and try again.
this._rawHeader.push(chunk);
this.push('');
} else {
this._inBody = true;
var h = chunk.slice(0, split);
this._rawHeader.push(h);
var header = Buffer.concat(this._rawHeader).toString();
try {
this.header = JSON.parse(header);
} catch (er) {
this.emit('error', new Error('invalid simple protocol data'));
return;
}
// now, because we got some extra data, unshift the rest
// back into the read queue so that our consumer will see it.
var b = chunk.slice(split);
this.unshift(b);
// and let them know that we are done parsing the header.
this.emit('header', this.header);
}
} else {
// from there on, just provide the data to our consumer.
// careful not to push(null), since that would indicate EOF.
var chunk = this._source.read();
if (chunk) this.push(chunk);
}
};
// Usage:
// var parser = new SimpleProtocol(source);
// Now parser is a readable stream that will emit 'header'
// with the parsed header data.
new stream.Readable([options])
- options Object
- highWaterMark Number 在停止从底层资源读取之前,在内部缓冲中存储的最大字节数。默认为16kb,对于
objectMode
则是16 - encoding String 如果被指定,那么缓冲将被利用指定编码解码为字符串,默认为
null
- objectMode Boolean 是否该流应该表现如一个对象的流。意思是说
stream.read(n)
返回一个单独的对象而不是一个大小为n
的Buffer
,默认为false
- highWaterMark Number 在停止从底层资源读取之前,在内部缓冲中存储的最大字节数。默认为16kb,对于
在实现了Readable
类的类中,请确保调用了Readable
构造函数,这样缓冲设置才能被正确的初始化。
readable._read(size)
- size Number 异步读取数据的字节数
注意:实现这个函数,而不要直接调用这个函数。
这个函数不应该被直接调用。它应该被子类实现,并且仅被Readable
类的内部方法调用。
所有的可读流都必须实现这个方法用来从底层资源中获取数据。
这个函数有一个下划线前缀,因为它对于类是内部的,并应该直接被用户的程序调用。你应在你的拓展类里覆盖这个方法。
当数据可用时,调用readable.push(chunk)
方法将之推入读取队列。如果方法返回false
,那么你应当停止读取。当_read
方法再次被调用,你应当推入更多数据。
参数size
仅作查询。“read”调用返回数据的实现可以通过这个参数来知道应当抓取多少数据;其余与之无关的实现,比如 TCP 或 TLS,则可忽略这个参数,并在可用时返回数据。例如,没有必要“等到”size
个字节可用时才调用stream.push(chunk)
。
readable.push(chunk[, encoding])
- chunk Buffer | null | String 被推入读取队列的数据块
- encoding String 字符串数据块的编码。必须是一个合法的
Buffer
编码,如'utf8'或'ascii' - return Boolean 是否应该继续推入
注意:这个函数应该被Readable
流的实现者调用,而不是消费者。
_read()
函数在至少调用一次push(chunk)
方法前,不会被再次调用。
Readable
类通过在readable
事件触发时,调用read()
方法将数据推入 之后用于读出数据的读取队列 来工作。
push()
方法需要明确地向读取队列中插入数据。如果它的参数为null
,那么它将发送一个数据结束信号(EOF
)。
这个 API 被设计为尽可能的灵活。例如,你可能正在包装一个有pause/resume
机制和一个数据回调函数的低级别源。那那些情况下,你可以通过以下方式包装这些低级别源:
// source is an object with readStop() and readStart() methods,
// and an `ondata` member that gets called when it has data, and
// an `onend` member that gets called when the data is over.
util.inherits(SourceWrapper, Readable);
function SourceWrapper(options) {
Readable.call(this, options);
this._source = getLowlevelSourceObject();
var self = this;
// Every time there's data, we push it into the internal buffer.
this._source.ondata = function(chunk) {
// if push() returns false, then we need to stop reading from source
if (!self.push(chunk))
self._source.readStop();
};
// When the source ends, we push the EOF-signaling `null` chunk
this._source.onend = function() {
self.push(null);
};
}
// _read will be called when the stream wants to pull more data in
// the advisory size argument is ignored in this case.
SourceWrapper.prototype._read = function(size) {
this._source.readStart();
};
Class: stream.Writable
stream.Writable
是一个被设计为需要实现底层的_write(chunk, encoding, callback)
方法的抽象类。
请参阅上文的面向流消费者的 API 来了解如何在程序中消费流。以下解释了如果在你的程序中实现可写流。
new stream.Writable([options])
- options Object
- highWaterMark Number
write()
方法开始返回false
的缓冲级别。默认为16 kb,对于objectMode
流则是16
- decodeStrings Boolean 是否在传递给
write()
方法前将字符串解码成Buffer
。默认为true
- objectMode Boolean 是否
write(anyObj)
为一个合法操作。如果设置为true
你可以写入任意数据而不仅是Buffer
或字符串数据。默认为false
- highWaterMark Number
在实现了Writable
类的类中,请确保调用了Writable
构造函数,这样缓冲设置才能被正确的初始化。
writable._write(chunk, encoding, callback)
- chunk Buffer | String 将要被写入的数据块。除非
decodeStrings
配置被设置为false
,否则将一直是一个buffer
- encoding String 如果数据块是一个字符串,那么这就是编码的类型。如果是一个
buffer
,那么则会忽略它 - callback Function 当你处理完给定的数据块后调用这个函数
所有的Writable
流的实现都必须提供一个_write()
方法来给底层资源传输数据。
这个函数不应该被直接调用。它应该被子类实现,并且仅被Writable
类的内部方法调用。
回调函数使用标准的callback(error)
模式来表示这个写操作成功或发生了错误。
如果构造函数选项中设置了decodeStrings
标志,那么数据块将是一个字符串而不是一个Buffer
,编码将会决定字符串的类型。这个是为了帮助处理编码字符串的实现。如果你没有明确地将decodeStrings
选项设为false
,那么你会安全地忽略encoding
参数,并且数据块是Buffer
形式。
这个函数有一个下划线前缀,因为它对于类是内部的,并应该直接被用户的程序调用。你应在你的拓展类里覆盖这个方法。
writable._writev(chunks, callback)
- chunks Array 将被写入的数据块数组。其中每一个数据都有如下格式:
{ chunk: ..., encoding: ... }
- callback Function 当你处理完给定的数据块后调用这个函数
注意:这个函数不应该被直接调用。它应该被子类实现,并且仅被Writable
类的内部方法调用。
这个函数对于你的实现是完全可选的。大多数情况下它是不必的。如果实现,它会被以所有滞留在写入队列中的数据块调用。
Class: stream.Duplex
一个“双工”流既是可读的,又是可写的。如 TCPsocket
连接。
注意,和你实现Readable
或Writable
流时一样,stream.Duplex
是一个被设计为需要实现底层的_read(size)
和_write(chunk, encoding, callback)
方法的抽象类。
由于JavaScript
并不具备多继承能力,这个类是继承于Readable
类,并寄生于Writable
类。所以为了实现这个类,用户需要同时实现低级别的_read(n)
方法和低级别的_write(chunk, encoding, callback)
方法。
new stream.Duplex(options)
- options Object 同时传递给
Writable
和Readable
构造函数。并且包含以下属性:- allowHalfOpen Boolean 默认为
true
。如果设置为false
,那么流的可读的一端结束时可写的一端也会自动结束,反之亦然。 - readableObjectMode Boolean 默认为
false
,为流的可读的一端设置objectMode
。当objectMode
为true
时没有效果。 - writableObjectMode Boolean 默认为
false
,为流的可写的一端设置objectMode
。当objectMode
为true
时没有效果。
- allowHalfOpen Boolean 默认为
在实现了Duplex
类的类中,请确保调用了Duplex
构造函数,这样缓冲设置才能被正确的初始化。
Class: stream.Transform
“转换”流是一个输出于输入存在对应关系的双工流,如一个zilib
流或一个crypto
流。
输出和输出并不需要有相同的大小,相同的数据块数或同时到达。例如,一个哈希流只有一个单独数据块的输出当输入结束时。一个zlib
流的输出比其输入小得多或大得多。
除了实现_read()
方法和_write()
方法,转换流还必须实现_transform()
方法,并且可选地实现_flush()
方法(参阅下文)。
new stream.Transform([options])
- options Object 同时传递给
Writable
和Readable
构造函数。
在实现了Transform
类的类中,请确保调用了Transform
构造函数,这样缓冲设置才能被正确的初始化。
transform._transform(chunk, encoding, callback)
- chunk Buffer | String 将要被写入的数据块。除非
decodeStrings
配置被设置为false
,否则将一直是一个buffer
- encoding String 如果数据块是一个字符串,那么这就是编码的类型。如果是一个 buffer,那么则会忽略它
- callback Function 当你处理完给定的数据块后调用这个函数
这个函数不应该被直接调用。它应该被子类实现,并且仅被Transform
类的内部方法调用。
所有Transform
流的实现都必须提供一个_transform
方法来接受输入和产生输出。
在Transform
类中,_transform
可以做需要做的任何事,如处理需要写入的字节,将它们传递给可写端,异步 I/O,等等。
调用transform.push(outputChunk)
0次或多次来从输入的数据块产生输出,取决于你想从这个数据块中输出多少数据作为结果。
仅当目前的数据块被完全消费后,才会调用回调函数。注意,对于某些特殊的输入可能会没有输出。如果你将数据作为第二个参数传入回调函数,那么数据将被传递给push
方法。换句话说,下面的两个例子是相等的:
transform.prototype._transform = function (data, encoding, callback) {
this.push(data);
callback();
}
transform.prototype._transform = function (data, encoding, callback) {
callback(null, data);
}
这个函数有一个下划线前缀,因为它对于类是内部的,并应该直接被用户的程序调用。你应在你的拓展类里覆盖这个方法。
transform._flush(callback)
- callback Function 当你排空了所有剩余数据后,这个回调函数会被调用
注意:这个函数不应该被直接调用。它应该被子类实现,并且仅被Transform
类的内部方法调用。
在一些情景中,你的转换操作需要在流的末尾多发生一点点数据。例如,一个Zlib
压缩流会存储一些内部状态以便它能优化压缩输出。但是在最后,它需要尽可能好得处理这些留下的东西来使数据完整。
在这种情况中,您可以实现一个_flush
方法,它会在最后被调用,在所有写入数据被消费、但在触发end
表示可读端到达末尾之前。和_transform
一样,只需在写入操作完成时适当地调用transform.push(chunk)
零或多次。
这个函数有一个下划线前缀,因为它对于类是内部的,并应该直接被用户的程序调用。你应在你的拓展类里覆盖这个方法。
Events: 'finish' 和 'end'
finish
和end
事件分别来自于父类Writable
和Readable
。finish
事件在end()
方法被调用以及所有的输入被_transform
方法处理后触发。end
事件在所有的在_flush
方法的回调函数被调用后的数据被输出后触发。
Example: SimpleProtocol 解释器 v2
上文中的简单协议解释器可以简单地通过高级别的Transform
流更好地实现。与上文例子中的parseHeader
和SimpleProtocol v1
相似。
在这个例子中,没有从参数中提供输入,然后将它导流至解释器中,这更符合io.js
的使用习惯。
var util = require('util');
var Transform = require('stream').Transform;
util.inherits(SimpleProtocol, Transform);
function SimpleProtocol(options) {
if (!(this instanceof SimpleProtocol))
return new SimpleProtocol(options);
Transform.call(this, options);
this._inBody = false;
this._sawFirstCr = false;
this._rawHeader = [];
this.header = null;
}
SimpleProtocol.prototype._transform = function(chunk, encoding, done) {
if (!this._inBody) {
// check if the chunk has a \n\n
var split = -1;
for (var i = 0; i < chunk.length; i++) {
if (chunk[i] === 10) { // '\n'
if (this._sawFirstCr) {
split = i;
break;
} else {
this._sawFirstCr = true;
}
} else {
this._sawFirstCr = false;
}
}
if (split === -1) {
// still waiting for the \n\n
// stash the chunk, and try again.
this._rawHeader.push(chunk);
} else {
this._inBody = true;
var h = chunk.slice(0, split);
this._rawHeader.push(h);
var header = Buffer.concat(this._rawHeader).toString();
try {
this.header = JSON.parse(header);
} catch (er) {
this.emit('error', new Error('invalid simple protocol data'));
return;
}
// and let them know that we are done parsing the header.
this.emit('header', this.header);
// now, because we got some extra data, emit this first.
this.push(chunk.slice(split));
}
} else {
// from there on, just provide the data to our consumer as-is.
this.push(chunk);
}
done();
};
// Usage:
// var parser = new SimpleProtocol();
// source.pipe(parser)
// Now parser is a readable stream that will emit 'header'
// with the parsed header data.
Class: stream.PassThrough
这是一个Transform
流的实现。将输入的流简单地传递给输出。它的主要目的是用来演示和测试,但它在某些需要构建特殊流的情况下可能有用。
简化的构造器 API
可以简单的构造流而不使用继承。
这可以通过调用合适的方法作为构造函数和参数来实现:
例子:
Readable
var readable = new stream.Readable({
read: function(n) {
// sets this._read under the hood
}
});
Writable
var writable = new stream.Writable({
write: function(chunk, encoding, next) {
// sets this._write under the hood
}
});
// or
var writable = new stream.Writable({
writev: function(chunks, next) {
// sets this._writev under the hood
}
});
Duplex
var duplex = new stream.Duplex({
read: function(n) {
// sets this._read under the hood
},
write: function(chunk, encoding, next) {
// sets this._write under the hood
}
});
// or
var duplex = new stream.Duplex({
read: function(n) {
// sets this._read under the hood
},
writev: function(chunks, next) {
// sets this._writev under the hood
}
});
Transform
var transform = new stream.Transform({
transform: function(chunk, encoding, next) {
// sets this._transform under the hood
},
flush: function(done) {
// sets this._flush under the hood
}
});
流:内部细节
缓冲
Writable
流和Readable
流都会分别在一个内部的叫_writableState.buffer
或_readableState.buffer
的对象里缓冲数据。
潜在的被缓冲的数据量取决于被传递给构造函数的highWaterMark
参数。
在Readable
流中,当其的实现调用stream.push(chunk)
时就会发生缓冲。如果流的消费者没有调用stream.read()
,那么数据就会保留在内部队列中直到它被消费。
在Writable
流中,当用户重复调用stream.write(chunk)
时就会发生缓冲,甚至是当write()
返回false
时。
流,尤其是pipe()
方法的初衷,是限制数据的滞留量在一个可接受的水平,这样才使得不同传输速度的来源和目标不会淹没可用的内存。
stream.read(0)
在一些情况下,你想不消费任何数据而去触发一次底层可读流机制的刷新。你可以调用stream.read(0)
,它总是返回null
。
如果内部的读缓冲量在highWaterMark
之下,并且流没有正在读取,那么调用read(0)
将会触发一次低级别的_read
调用。
几乎永远没有必须这么做。但是,你可能会在io.js
的Readable
流类的内部代码的几处看到这个。
stream.push('')
推入一个0字节的字符串或Buffer
(不处于对象模式)有一个有趣的副作用。因为这是一个stream.push()
的调用,它将会结束读取进程。但是,它不添加任何数据到可读缓冲中,所以没有任何用户可消费的数据。
在极少的情况下,你当下没有数据可以提供,但你的消费者同过调用stream.read(0)
来得知合适再次检查。在这样的情况下,你可以调用stream.push('')
。
至今为止,这个功能的唯一使用之处是在tls.CryptoStream
类中,它将在io.js
的1.0版本中被废弃。如果你发现你不得不使用stream.push('')
,请考虑使用另外的方式。因为这几乎表示发生了某些可怕的错误。
与旧版本的Node.js
的兼容性
在Node.js
的0.10版本之前,可读流接口非常简单,并且功能和功用都不强。
data
事件会立刻触发,而不是等待你调用read()
方法。如果你需要进行一些I/O
操作来决定是否处理数据,那么你只能将数据存储在某些缓冲区中以防数据流失。pause()
仅供查询,并不保证生效。这意味着你还是要准备接收data
事件在流已经处于暂停模式中时。
在io.js
v1.0 和Node.js
v0.10中,下文所述的Readable
类添加进来。为了向后兼容性,当一个data
事件的监听器被添加时或resume()
方法被调用时,可读流切换至流动模式。其作用是,即便您不使用新的read()
方法和readable
事件,您也不必担心丢失数据块。
大多数程序都会保持功能正常,但是,以下有一些边界情况:
- 没有添加任何
data
事件 - 从未调用
resume()
方法 - 流没有被导流至任何可写的目标
例如,考虑以下代码:
// WARNING! BROKEN!
net.createServer(function(socket) {
// we add an 'end' method, but never consume the data
socket.on('end', function() {
// It will never get here.
socket.end('I got your message (but didnt read it)\n');
});
}).listen(1337);
在Node.js
v0.10前,到来的信息数据会被简单地丢弃。但是在io.js
v1.0 和Node.js
v0.10后,socket
会被永远暂停。
解决方案是调用resume()
方法来开启数据流:
// Workaround
net.createServer(function(socket) {
socket.on('end', function() {
socket.end('I got your message (but didnt read it)\n');
});
// start the flow of data, discarding it.
socket.resume();
}).listen(1337);
除了新的Readable
流切换至流动模式之外,在v0.10之前的流可以被使用wrap()
方法包裹。
对象模式
通常情况下,流仅操作字符串和Buffer
。
处于对象模式中的流除了Buffer
和字符串外,还能读出普通的JavaScirpt
值。
处于对象模式中的可读流在调用stream.read(size)
后只会返回单个项目,不论size
参数是什么。
处于对象模式中的可写流总是忽略stream.write(data, encoding)
中的encoding
参数。
对于处于对象模式中的流,特殊值null
仍然保留它的特殊意义。也就是说,对于对象模式的可读流,stream.read()
返回一个null
仍意味着没有更多的数据了,并且stream.push(null)
会发送一个文件末端信号(EOF
)。
核心io.js
中没有流是对象模式的。这个模式仅仅供用户的流库使用。
你应当在子类的构造函数的options
参数对象中设置对象模式。在流的过程中设置对象模式时不安全的。
对于双工流,可以分别得通过readableObjectMode
和writableObjectMode
设置可读端和可写端。这些配置可以被用来通过转换流实现解释器和序列化器。
var util = require('util');
var StringDecoder = require('string_decoder').StringDecoder;
var Transform = require('stream').Transform;
util.inherits(JSONParseStream, Transform);
// Gets \n-delimited JSON string data, and emits the parsed objects
function JSONParseStream() {
if (!(this instanceof JSONParseStream))
return new JSONParseStream();
Transform.call(this, { readableObjectMode : true });
this._buffer = '';
this._decoder = new StringDecoder('utf8');
}
JSONParseStream.prototype._transform = function(chunk, encoding, cb) {
this._buffer += this._decoder.write(chunk);
// split on newlines
var lines = this._buffer.split(/\r?\n/);
// keep the last partial line buffered
this._buffer = lines.pop();
for (var l = 0; l < lines.length; l++) {
var line = lines[l];
try {
var obj = JSON.parse(line);
} catch (er) {
this.emit('error', er);
return;
}
// push the parsed object out to the readable consumer
this.push(obj);
}
cb();
};
JSONParseStream.prototype._flush = function(cb) {
// Just handle any leftover
var rem = this._buffer.trim();
if (rem) {
try {
var obj = JSON.parse(rem);
} catch (er) {
this.emit('error', er);
return;
}
// push the parsed object out to the readable consumer
this.push(obj);
}
cb();
};