node stream

定义:当涉及到持续不断的对数据进行读写时,流就出现了,在一个应用程序中,流是一组有序的,有起点和终点的字节数据(流数据)的传输手段。

先将该对象中所包含的数据转化为字节数据也就是流数据。再通过流的传输,到达目的对象后,再将流数据转化为该对象中可以使用的数据。

1、处理数据有两种模式

buffer模式:取完数据一次性操作

stream模式:边取数据边操作

2、node 中流的类型

四种基本流类型:

  • Readable 可读流(fs.createReadStream())

  • Writable 可写流(fs.createWriteStream())

  • Duplex 可读可写流(双工流)(process.stdin 属性返回的流。 是net.Socket 流(也就是双工流))

  • Transform 在读写过程中可以修改和变换数据的Duplex流(转换流)

3、node fs 模块使用流

下面的代码每次可以读取可变大小的内容块,并且每次读取后会触发回调函数,不同于readFile方法要等到整个文件读取载入RAM,可用的情况下才会触发。

1
2
3
4
5
6
7
let steam = fs.createReadStream('./steam.txt')
steam.on('data', (chunk) => {
console.log(chunk, 'steam------')
})
steam.on('end', (chunk) => {
console.log(chunk, 'end-------读取完成')
})

可读流

1、可读流模式

  • 流动模式(flowing)

  • 暂停模式(paused)

所有可读流都开始于暂停模式,可以通过以下方式切换到流动模式

  • 添加 ‘data’ 事件句柄。

  • 调用 stream.resume() 方法。

  • 调用 stream.pipe() 方法将数据发送到可写流。

可读流可以通过以下方式切换回暂停模式

  • 如果没有管道目标,则调用 stream.pause()。

  • 如果有管道目标,则移除所有管道目标。调用 stream.unpipe() 可以移除多个管道目标。

1、可读流使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

const fs = require('fs');
//创建一个文件可读流
let stream = fs.createReadStream('./steam.txt', {
//读取缓冲区的大小,默认64K
highWaterMark: 10,
encoding: 'utf8'
});
// 文件打开时触发
stream.on('open', function () {
console.log('文件打开');
});
// data事件,会让当前流切换到流动模式, 上面配置了 highWaterMark 为 10字节,所以下面会打印多次。
stream.on('data', function (data) {
console.log(data, '====');
});
//流中没有数据可供消费者时触发
stream.on('end', function () {
console.log('数据读取完毕');
});
//读取数据出错时触发
stream.on('error', function () {
console.log('读取错误');
});
//当文件被关闭时触发
stream.on('close', function () {
console.log('文件关闭');
});

2、暂停和恢复流的读取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

const fs = require('fs');
//创建一个文件可读流
let stream = fs.createReadStream('./steam.txt', {
//读取缓冲区的大小,默认64K
highWaterMark: 10,
encoding: 'utf8'
});
stream.on('data', function (data) {
// 暂停流的读取
stream.pause()
console.log(data, '====');
// 恢复流的读取
setTimeout(function () {
stream.resume();
}, 3000);
});

可写流

所有可写流都实现了stream.Writable类定义的接口。尽管可写流的具体实例可能略有差别,但所有的可写流都遵循同一基本的使用模式,如下:

1
2
3
4
const myStream = getWritableStreamSomehow();
myStream.write('一些数据');
myStream.write('更多数据');
myStream.end('完成写入数据');

1、可写流缓冲区

1
2
3
4
5
6
7
8
9
10
11
let stream = fs.createWriteStream('./steam.txt', {
highWaterMark: 10
});
// write方法,往流中写入数据,参数一表示要写入的数据,参数二表示编码方式,参数三表示写入成功的回调,缓冲区满时返回false,未满时返回true。

// 我们设置的缓冲区大小为 10字节,所以到写入第4个时,就返回了false。

console.log(stream.write('123', 'utf8'));
console.log(stream.write('456', 'utf8'));
console.log(stream.write('789', 'utf8'));
console.log(stream.write('10', 'utf8'));

2、可写流使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
function data () {
let cnt = 52;
return function () {
let flag = true;
while (cnt && flag) {
flag = stream.write(`${cnt}`);
console.log('缓冲区中写入的字节数', stream.writableLength);
cnt--;
}
};
}
let getData = data();
getData();
// 当缓冲区中的数据满的时候,即stream.write返回false,无法再写入。
// 需要等待清空,后才可以再次写入,而'drain'事件,则是清空后触发的事件,告诉生产者可以继续写数据了。
stream.on('drain', function () {
console.log('可以继续写数据了,清空后再次写入字节数,反复操纵,直到写完全部,在这里是达到cnt的字节长度', stream.writableLength);
getData();
});
//当流或底层资源关闭时触发
stream.on('close', function () {
console.log('文件被关闭');
});
//当写入数据出错时触发
stream.on('error', function () {
console.log('写入数据错误');
});

3、可写流性能

当写入大量小块数据到流时,内部缓冲可能失效,从而导致性能下降

writable.cork()

强制把所有写入的数据都缓冲到内存中。当调用 stream.uncork() 或 stream.end() 时,缓冲的数据才会被输出。

writable.uncork()

将调用 stream.cork() 后缓冲的所有数据输出到目标。

使用 writable.cork() 和 writable.uncork() 来管理流的写入缓冲时

建议使用 process.nextTick() 来延迟调用 writable.uncork()。 通过这种方式,可以对单个 Node.js 事件循环中调用的所有 writable.write() 进行批处理。

1
2
3
4
stream.cork();
stream.write('一些 ');
stream.write('数据 ');
process.nextTick(() => stream.uncork());

如果一个流上多次调用 writable.cork(),则必须调用同样次数的 writable.uncork() 才能输出缓冲的数据。

1
2
3
4
5
6
7
8
9
stream.cork();
stream.write('一些 ');
stream.cork();
stream.write('数据 ');
process.nextTick(() => {
stream.uncork();
// 数据不会被输出,直到第二次调用 uncork()。
stream.uncork();
});

双工流

1、双工使用

1
2
3
4
5
6
7
8
9
process.stdin.setEncoding('utf8');
process.stdin.on('data', async input => {
input = input.replace(/\s/g, '') + '读取了输入内容,并且输出到控制台'
process.stdout.write(input);
});

process.stdin.on('end', () => {
process.stdout.write('结束');
});

2、可读流不可写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

const fs = require('fs');
//创建一个文件可读流
let stream = fs.createReadStream('./steam.txt', {
//读取缓冲区的大小,默认64K
highWaterMark: 10,
encoding: 'utf8'
});
// 创建一个文件可写流
// let stream = fs.createWriteStream('./steam.txt', {
// highWaterMark: 10
// });

process.stdin.setEncoding('utf8');
process.stdin.on('data', async input => {
input = input.replace(/\s/g, '') + '读取了输入内容,并且输出到控制台'
// 我们将输入内容试图写到创建的可读流里,是不会成功的。
stream.write(input);
});

process.stdin.on('end', () => {
process.stdout.write('结束');
});

3、可写流不可读

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
const fs = require('fs');
//创建一个文件可读流
// let stream = fs.createReadStream('./steam.txt', {
// //读取缓冲区的大小,默认64K
// highWaterMark: 10,
// encoding: 'utf8'
// });

//创建一个文件可写流
let stream = fs.createWriteStream('./steam.txt', {
highWaterMark: 10
});

process.stdin.setEncoding('utf8');
process.stdin.on('data', async input => {
input = input.replace(/\s/g, '') + '读取了输入内容,并且输出到控制台'
// 创建可写流,试图去读文件内容,是不成功的。
stream.on('data', function (data) {
console.log(data, '====');
});
});

process.stdin.on('end', () => {
process.stdout.write('结束');
});

可读流和可写流是继承自不同的类,所拥有的方法也是不同的,所以创建了可读流去执行写操作或创建了可写流去执行读操作是无法完成的

转换流

还不太明白使用场景。

管道pipe()

在可读流与可写流之间连接一个管道,完成数据间的流动

未使用管道,实现数据读写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

const fs = require('fs');
//创建一个可读流
let rs = fs.createReadStream('./steam.txt', {
highWaterMark: 10
});
//创建一个可写流
let ws = fs.createWriteStream('./steamR.txt', {
highWaterMark: 10
});
rs.on('data', function (data) {
let flag = ws.write(data);
console.log(`往可写流中写入 ${data.length} 字节数据`);
//如果写入缓冲区已满,则暂停可读流的读取
if (!flag) {
rs.pause();
console.log('暂停可读流');
}
});
//监控可读流数据是否读完
rs.on('end', function () {
console.log('数据已读完');
//如果可读流读完了,则调用 end() 表示可写流已写入完成
ws.end();
});
//如果可写流缓冲区已清空,可以再次写入,则重新打开可读流
ws.on('drain', function () {
rs.resume();
console.log('重新开启可读流');
});

使用pipe实现数据读写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

const fs = require('fs');
//创建一个可读流
let rs = fs.createReadStream('./steam.txt', {
highWaterMark: 10
});
//创建一个可写流
let ws = fs.createWriteStream('./steamR.txt', {
highWaterMark: 10
});
let ws2 = fs.createWriteStream('./steamR2.txt', {
highWaterMark: 10
});
//绑定可写流到可读流,自动将可读流切换到流动模式,将可读流的所有数据推送到可写流。
rs.pipe(ws);
//可以绑定多个可写流
rs.pipe(ws2);
返回
顶部