Node.js中的流(Stream)介绍

网络编程 2021-07-04 21:47www.168986.cn编程入门
这篇文章主要介绍了Node.js中的流(Stream)介绍,本文讲解了什么是流、pipe方法、流的分类、Readable流状态的切换等内容,需要的朋友可以参考下

什么是流?

说到流,就涉及到一个nix的概念——在nix中,流在Shell中被实现为可以通过 |(管道符) 进行桥接的数据,一个进程的输出(stdout)可被直接作为下一个进程的输入(stdin)。

在Node中,流()的概念与之类似,代表一种数据流可供桥接的能力。

pipe

流化的精髓在于 .pipe()方法。可供桥接的能力,在于数据流的两端(上游/下游 或称为 读/写流)以一个 .pipe()方法进行桥接。

伪代码的表现形式为

代码如下:

//上游.pipe(下游)
Readable.pipe(Writable);

流的分类

这里并不打算讨论所谓的Node  v0.4 之前的“经典”流。那么,流分为这么几类(皆为抽象接口

1.    可读流(需要实现_read方法,关注点在于对数据流读取的细节
2.    可写流(需要实现_write方法,关注点在于对数据流写入的细节
3.        可读/写流(需要实现以上两接口,关注点为以上两接口的细节
4.继承自Duplex(需要实现_transform方法,关注点在于对数据块的处理

简单来说

1).pipe() 的拥有者一定具备 Readable 流(并不局限于)能力,它拥有 'readable'/'data'/'end'/'close'/'error' 一系列事件可供订阅,也提供 .read()/.pause()/.resume()等一系列方法供调用;
2).pipe() 的参数一定具备Writable 流(并不局限于 )能力,它拥有 'drain'/'pipe'/'unpipe'/'error'/'finish' 事件可供访问,也提供 .write()/.end() 等一系列方法供调用

什么鬼

有没有一丝丝焦虑?别急,做为一个说人话的低级码工,我会把Stream掰开了和您扯一扯的。

Stream类,在 里,是这么定义的

代码如下:

var EE = require('events').EventEmitter;
var util = require('util');
util.inherits(Stream, EE);
 
function Stream() {
  EE.call(this);
}

可以看出,本质上,Stream是一个EventEmitter,那意味着它具备事件驱动的功能(.emit/.on...)。众所周知,“Node.js 就是基于V8的事件驱动平台”,实现了事件驱动的流式编程,具备了和Node一样的异步回调的特征。

比如在 Readable 流中,有一个 readable 事件,在一个暂停的只读流中,只要有数据块准备好可读时,它就会被发送给订阅者(Readable 流有哪些呢?express中的 req,ftp或者mutli-form上传组件的req.part,系统中的标准输入 process.stdin等)。有了readable 事件,我们可以做个处理shell 命令输出的分析器之类的工具

代码如下:

process.stdin.on('readable', function(){
   var buf = process.stdin.read();
   if(buf){
      var data = buf.toString();
      // parsing data ...                                               
   }
});

这样调用

代码如下:

head -10 some.txt | node parser.js

对于 Readable 流,我们还可以订阅它的 data 和 end 事件,以获取数据块并在流枯竭时获得通知,如 中那样

代码如下:

req.on('connect', function(res, socket, head) {
    socket.on('data', function(chunk) {
      console.log(chunk.toString());
    });
    socket.on('end', function() {
      proxy.close();
    });
  });

Readable流状态的切换
需要注意的是,Readable 流有两种状态flowing mode(激流) 和 pause  mode(暂停)。前者根本停不下来,谁被pipe上了就马上不停的给;后者会暂停,直到下游显式的调用 Stream.read() 请求才读取数据块。Readable 流初始化时是 pause mode的。

这两种状态可以互为切换的,其中,

有以下任一行为,pause 转 flowing

1.对 Readable 流添加一个data事件订阅
2.对 Readable 调用 .resume() 显式开启flowing
3.调用 Readable 流的 .pipe(writable) ,桥接到一个 Writable 流上

有以下任一行为,flowing 转回 pause

1.Readable 流还没有 pipe 到任何流上,可调 .pause() 暂停
2.Readable 流已经 pipe 到了流上,需 remove 掉所有 data 事件订阅,并且调用 .unpipe()方法逐一解除与下游流的关系

妙用

结合流的异步特性,我可以写出这样的应用直接将 用户A 的输出桥接到 用户B 的页面上输出

代码如下:

router.post('/post', function(req, res) {
    var destination = req.headers['destination']; //发给谁
    cache[destionation] = req;
    //是的,并不返回,所以最好是个ajax请求
});

用户B请求的时候

代码如下:

router.get('/inbox', function(req, res){
    var user = req.headers['user'];
    cache.find(user, function(err, previousReq){ //找到之前存的req
       var form = new multiparty.Form();
       form.parse(previousReq);  // 有文件给我
       form.on('part', function (part) {
            part.pipe(res); //流式大法好:)
 
            part.on('error', function (err) {
                console.log(err);
                messaging.setRequestDone(uniqueID);
                return res.end(err);
            });
        });
    });
});

参考

how to write node programs with streams: stream-handbook

Copyright © 2016-2025 www.168986.cn 狼蚁网络 版权所有 Power by