NodeJs through处理流

发布时间 2023-04-26 14:56:21作者: _成飞

through2主要是基于streams2(2指的是API稳定性)封装的transform stream。其内部仅是封装了Transform的构造函数,以及更为易用的objectMode模式。

through2并未引用node默认提供的stream模块,而是使用社区中较为流行的readable-stream模块,主要是为了对之前node版本做了兼容支持。

我们可以先看一段关于transform stream使用的基本示例:

const { Transform } = require('stream');

const upperCaseTr = new Transform({
  transform(chunk, encoding, callback) {
    // 转换为大写、push到可读流
    this.push(chunk.toString().toUpperCase());
    callback();
  }
});

process.stdin.pipe(upperCaseTr).pipe(process.stdout);

对应的如果换成through2写法。

process.stdin.pipe(through2(function(chunk, enc, callback) {
  this.push(chunk.toString().toUpperCase())
  callback()
})).pipe(process.stdout);

接下来我们再看看对象模式,默认地,stream除了接收 Buffer/String 值。还有一个对象模式(objectMode)的标识,我们可以设置以接受任意Javascript 对象。

const { Transform } = require('stream');
const commaSplitter = new Transform({
  readableObjectMode: true,
  transform(chunk, encoding, callback) {
    this.push(chunk.toString().trim().split(','));
    callback();
  }
});
const arrayToObject = new Transform({
  readableObjectMode: true,
  writableObjectMode: true,
  transform(chunk, encoding, callback) {
    const obj = {};
    for(let i=0; i < chunk.length; i+=2) {
      obj[chunk[i]] = chunk[i+1];
    }
    this.push(obj);
    callback();
  }
});
const objectToString = new Transform({
  writableObjectMode: true,
  transform(chunk, encoding, callback) {
    this.push(JSON.stringify(chunk) + '\n');
    callback();
  }
});
process.stdin
  .pipe(commaSplitter)
  .pipe(arrayToObject)
  .pipe(objectToString)
  .pipe(process.stdout)

我们在终端输入:

$ a,b,c,d
$ {"a":"b","c":"d"}

对应的如果换成through2写法。

process.stdin
  .pipe(through2.obj(function (chunk, enc, callback) {
    this.push(chunk.toString().trim().split(','));
    callback();
  }))
  .pipe(through2.obj(function (chunk, enc, callback) {
    const obj = {};
    for(let i=0; i < chunk.length; i+=2) {
      obj[chunk[i]] = chunk[i+1];
    }
    this.push(obj);
    callback();
  }))
  .pipe(through2.obj(function (chunk, enc, callback) {
    this.push(JSON.stringify(chunk) + '\n');
    callback();
  }))
  .pipe(process.stdout)

 pipe中间进行处理

var through2  = require('through2');
var stream = through2(write,end)
process.stdin
    .pipe(stream)
    .pipe(process.stdout);

function write(line,_,next){
    this.push(line.toString().toUpperCase())
    next();
})
function end(done){
    done();
})