Узел.JS передает один и тот же читаемый поток в несколько (доступных для записи) целей


мне нужно запустить две команды подряд, которые должны считывать данные из одного потока. После передачи потока в другой буфер очищается, поэтому я не могу снова читать данные из этого потока, поэтому это не работает:

var spawn = require('child_process').spawn;
var fs = require('fs');
var request = require('request');

var inputStream = request('http://placehold.it/640x360');
var identify = spawn('identify',['-']);

inputStream.pipe(identify.stdin);

var chunks = [];
identify.stdout.on('data',function(chunk) {
  chunks.push(chunk);
});

identify.stdout.on('end',function() {
  var size = getSize(Buffer.concat(chunks)); //width
  var convert = spawn('convert',['-','-scale',size * 0.5,'png:-']);
  inputStream.pipe(convert.stdin);
  convert.stdout.pipe(fs.createWriteStream('half.png'));
});

function getSize(buffer){
  return parseInt(buffer.toString().split(' ')[2].split('x')[0]);
}

запрос жалуется на этот

Error: You cannot pipe after data has been emitted from the response.

и меняем inputStream до fs.createWriteStream дает тот же вопрос, конечно. Я не хочу писать в файл, но использовать в некотором роде поток, который запрос производит (или любой другой, если на то пошло).

есть ли способ повторно использовать читаемый поток, как только он заканчивает трубой? Каков был бы лучший способ выполнить что-то вроде приведенного выше примера?

5 54

5 ответов:

вы должны создать дубликат потока, передавая его по трубопроводу в два потока. Вы можете создать простой поток с помощью сквозного потока, он просто передает вход на выход.

const spawn = require('child_process').spawn;
const PassThrough = require('stream').PassThrough;

const a = spawn('echo', ['hi user']);
const b = new PassThrough();
const c = new PassThrough();

a.stdout.pipe(b);
a.stdout.pipe(c);

let count = 0;
b.on('data', function (chunk) {
  count += chunk.length;
});
b.on('end', function () {
  console.log(count);
  c.pipe(process.stdout);
});

выход:

8
hi user

первый ответ работает только в том случае, если потоки занимают примерно столько же времени для обработки данных. Если вы занимаете значительно больше времени, тем быстрее будете запрашивать новые данные, следовательно, перезаписывая данные, которые все еще используются более медленным (у меня была эта проблема после попытки решить ее с помощью дублирующего потока).

следующий шаблон работал очень хорошо для меня. Он использует библиотеку на основе Stream2 streams, Streamz и обещает синхронизировать асинхронные потоки через обратный вызов. С помощью знакомый пример из первого ответа:

spawn = require('child_process').spawn;
pass = require('stream').PassThrough;
streamz = require('streamz').PassThrough;
var Promise = require('bluebird');

a = spawn('echo', ['hi user']);
b = new pass;
c = new pass;   

a.stdout.pipe(streamz(combineStreamOperations)); 

function combineStreamOperations(data, next){
  Promise.join(b, c, function(b, c){ //perform n operations on the same data
  next(); //request more
}

count = 0;
b.on('data', function(chunk) { count += chunk.length; });
b.on('end', function() { console.log(count); c.pipe(process.stdout); });

для общей проблемы, следующий код работает нормально

var PassThrough = require('stream').PassThrough
a=PassThrough()
b1=PassThrough()
b2=PassThrough()
a.pipe(b1)
a.pipe(b2)
b1.on('data', function(data) {
  console.log('b1:', data.toString())
})
b2.on('data', function(data) {
  console.log('b2:', data.toString())
})
a.write('text')

Как насчет трубопроводов в два или более потоков не в то же время ?

например :

var PassThrough = require('stream').PassThrough;
var mybiraryStream = stream.start(); //never ending audio stream
var file1 = fs.createWriteStream('file1.wav',{encoding:'binary'})
var file2 = fs.createWriteStream('file2.wav',{encoding:'binary'})
var mypass = PassThrough
mybinaryStream.pipe(mypass)
mypass.pipe(file1)
setTimeout(function(){
   mypass.pipe(file2);
},2000)

приведенный выше код не выдает никаких ошибок, но файл file2 пуст

у меня есть другое решение для записи в два потока одновременно, естественно, время для записи будет добавлением двух раз, но я использую его для ответа на запрос загрузки, где я хочу сохранить копию загруженного файла на моем сервере (на самом деле я использую резервную копию S3, поэтому я кэширую наиболее часто используемые файлы локально, чтобы избежать нескольких передач файлов)

/**
 * A utility class made to write to a file while answering a file download request
 */
class TwoOutputStreams {
  constructor(streamOne, streamTwo) {
    this.streamOne = streamOne
    this.streamTwo = streamTwo
  }

  setHeader(header, value) {
    if (this.streamOne.setHeader)
      this.streamOne.setHeader(header, value)
    if (this.streamTwo.setHeader)
      this.streamTwo.setHeader(header, value)
  }

  write(chunk) {
    this.streamOne.write(chunk)
    this.streamTwo.write(chunk)
  }

  end() {
    this.streamOne.end()
    this.streamTwo.end()
  }
}

затем вы можете использовать это как обычный OutputStream

const twoStreamsOut = new TwoOutputStreams(fileOut, responseStream)

и передать его в свой метод, как если бы это был ответ или fileOutputStream