Узел.JS передает один и тот же читаемый поток в несколько (доступных для записи) целей
мне нужно запустить две команды подряд, которые должны считывать данные из одного потока. После передачи потока в другой буфер очищается, поэтому я не могу снова читать данные из этого потока, поэтому это не работает:
var spawn = require('child_process').spawn;
var fs = require('fs');
var request = require('request');
var inputStream = request('http://placehold.it/640x360');
var identify = spawn('identify',['-']);
inputStream.pipe(identify.stdin);
var chunks = [];
identify.stdout.on('data',function(chunk) {
chunks.push(chunk);
});
identify.stdout.on('end',function() {
var size = getSize(Buffer.concat(chunks)); //width
var convert = spawn('convert',['-','-scale',size * 0.5,'png:-']);
inputStream.pipe(convert.stdin);
convert.stdout.pipe(fs.createWriteStream('half.png'));
});
function getSize(buffer){
return parseInt(buffer.toString().split(' ')[2].split('x')[0]);
}
запрос жалуется на этот
Error: You cannot pipe after data has been emitted from the response.
и меняем inputStream до fs.createWriteStream
дает тот же вопрос, конечно.
Я не хочу писать в файл, но использовать в некотором роде поток, который запрос производит (или любой другой, если на то пошло).
есть ли способ повторно использовать читаемый поток, как только он заканчивает трубой? Каков был бы лучший способ выполнить что-то вроде приведенного выше примера?
5 ответов:
вы должны создать дубликат потока, передавая его по трубопроводу в два потока. Вы можете создать простой поток с помощью сквозного потока, он просто передает вход на выход.
const spawn = require('child_process').spawn; const PassThrough = require('stream').PassThrough; const a = spawn('echo', ['hi user']); const b = new PassThrough(); const c = new PassThrough(); a.stdout.pipe(b); a.stdout.pipe(c); let count = 0; b.on('data', function (chunk) { count += chunk.length; }); b.on('end', function () { console.log(count); c.pipe(process.stdout); });
выход:
8 hi user
первый ответ работает только в том случае, если потоки занимают примерно столько же времени для обработки данных. Если вы занимаете значительно больше времени, тем быстрее будете запрашивать новые данные, следовательно, перезаписывая данные, которые все еще используются более медленным (у меня была эта проблема после попытки решить ее с помощью дублирующего потока).
следующий шаблон работал очень хорошо для меня. Он использует библиотеку на основе Stream2 streams, Streamz и обещает синхронизировать асинхронные потоки через обратный вызов. С помощью знакомый пример из первого ответа:
spawn = require('child_process').spawn; pass = require('stream').PassThrough; streamz = require('streamz').PassThrough; var Promise = require('bluebird'); a = spawn('echo', ['hi user']); b = new pass; c = new pass; a.stdout.pipe(streamz(combineStreamOperations)); function combineStreamOperations(data, next){ Promise.join(b, c, function(b, c){ //perform n operations on the same data next(); //request more } count = 0; b.on('data', function(chunk) { count += chunk.length; }); b.on('end', function() { console.log(count); c.pipe(process.stdout); });
для общей проблемы, следующий код работает нормально
var PassThrough = require('stream').PassThrough a=PassThrough() b1=PassThrough() b2=PassThrough() a.pipe(b1) a.pipe(b2) b1.on('data', function(data) { console.log('b1:', data.toString()) }) b2.on('data', function(data) { console.log('b2:', data.toString()) }) a.write('text')
Как насчет трубопроводов в два или более потоков не в то же время ?
например :
var PassThrough = require('stream').PassThrough; var mybiraryStream = stream.start(); //never ending audio stream var file1 = fs.createWriteStream('file1.wav',{encoding:'binary'}) var file2 = fs.createWriteStream('file2.wav',{encoding:'binary'}) var mypass = PassThrough mybinaryStream.pipe(mypass) mypass.pipe(file1) setTimeout(function(){ mypass.pipe(file2); },2000)
приведенный выше код не выдает никаких ошибок, но файл file2 пуст
у меня есть другое решение для записи в два потока одновременно, естественно, время для записи будет добавлением двух раз, но я использую его для ответа на запрос загрузки, где я хочу сохранить копию загруженного файла на моем сервере (на самом деле я использую резервную копию S3, поэтому я кэширую наиболее часто используемые файлы локально, чтобы избежать нескольких передач файлов)
/** * A utility class made to write to a file while answering a file download request */ class TwoOutputStreams { constructor(streamOne, streamTwo) { this.streamOne = streamOne this.streamTwo = streamTwo } setHeader(header, value) { if (this.streamOne.setHeader) this.streamOne.setHeader(header, value) if (this.streamTwo.setHeader) this.streamTwo.setHeader(header, value) } write(chunk) { this.streamOne.write(chunk) this.streamTwo.write(chunk) } end() { this.streamOne.end() this.streamTwo.end() } }
затем вы можете использовать это как обычный OutputStream
const twoStreamsOut = new TwoOutputStreams(fileOut, responseStream)
и передать его в свой метод, как если бы это был ответ или fileOutputStream