Как реализовать записываемый поток
Я хочу передать данные из потока amazon kinesis в журнал s3 или журнал bunyan.
Пример работает с потоком записи файла или stdout. Как я буду имплицировать свой собственный записываемый поток?
//this works
var file = fs.createWriteStream('my.log')
kinesisSource.pipe(file)
Это не работает, говоря, что у него нет метода " on "
var stream = {}; //process.stdout works however
stream.writable = true;
stream.write =function(data){
console.log(data);
};
kinesisSource.pipe(stream);
Какие методы я должен реализовать для моего собственного пользовательского записываемого потока, документы, кажется, указывают, что мне нужно реализовать "write", А не " on "
2 ответа:
Чтобы создать свой собственный записываемый поток, у вас есть три возможности.
Создайте свой собственный класс
Для этого вам потребуется 1) расширить класс Writable 2) вызвать конструктор Writable в вашем собственном конструкторе 3) определить метод
_write()
в прототипе вашего объекта stream.Вот пример:
var stream = require('stream'); var util = require('util'); function EchoStream () { // step 2 stream.Writable.call(this); }; util.inherits(EchoStream, stream.Writable); // step 1 EchoStream.prototype._write = function (chunk, encoding, done) { // step 3 console.log(chunk.toString()); done(); } var myStream = new EchoStream(); // instanciate your brand new stream process.stdin.pipe(myStream);
Расширение пустого записываемого объекта
Вместо определения нового типа объекта можно создать экземпляр пустого объекта
Writable
и реализовать_write()
метод:var stream = require('stream'); var echoStream = new stream.Writable(); echoStream._write = function (chunk, encoding, done) { console.log(chunk.toString()); done(); }; process.stdin.pipe(echoStream);
Используйте упрощенный конструктор API
Если вы используете io.js, вы можете использовать упрощенный конструктор API :
var writable = new stream.Writable({ write: function(chunk, encoding, next) { console.log(chunk.toString()); next(); } });
Используйте класс ES6 в узле 4+
class EchoStream extends stream.Writable { _write(chunk, enc, next) { console.log(chunk.toString()); next(); } }
На самом деле создать записываемый поток довольно просто. Вот пример:
var fs = require('fs'); var Stream = require('stream'); var ws = new Stream; ws.writable = true; ws.bytes = 0; ws.write = function(buf) { ws.bytes += buf.length; } ws.end = function(buf) { if(arguments.length) ws.write(buf); ws.writable = false; console.log('bytes length: ' + ws.bytes); } fs.createReadStream('file path').pipe(ws);
Также, если вы хотите создать свой собственный класс, @Paul даст хороший ответ.