Criando uma Readable Stream
Para criar uma readable stream, é necessário implementar o método _read da classe Readable do módulo node:stream. Esse método é o responsável por realizar a operação de leitura em uma stream.
Por exemplo, considere o exemplo abaixo de implementação de uma stream de leitura.
import { Readable } from 'node:stream'
class OneToHundredStream extends Readable {
index = 1
_read() {
const i = this.index++
if (i > 100) {
this.push(null)
} else {
this.push(i)
}
}
}
new OneToHundredStream()
.pipe(process.stdout)Note
O método
pushserve para uma readable stream fornecer as informações a serem consumidas. Se o valor especificado como argumento desse método pornull, isso quer dizer que não mais dados para ser enviado para o consumidor (fim da stream).
$ node src/streams.js
TypeError [ERR_INVALID_ARG_TYPE]: The "chunk" argument must be of type string or an instance of Buffer or Uint8Array. Received type number (1)O Node utiliza do mecanismo de chunk em streams para enviar pedaços de dados e para isso, não é possível utilizar tipos primitivos diretamente para enviá-los em lotes (uma vez que essa lógica não é desse tipo de dado), para isso o Node trabalha com o formato de buffer.
import { Readable } from 'node:stream'
class OneToHundredStream extends Readable {
index = 1
_read() {
const i = this.index++
if (i > 100) {
this.push(null)
} else {
const buf = Buffer.from(String(i))
this.push(buf)
}
}
}
new OneToHundredStream()
.pipe(process.stdout)Note
O buffer permite apenas receber dados do tipo
string, então caso esteja trabalhando com outro formato comonumberé preciso convertê-lo parastring.
$ node src/streams.js
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100% Com isso, conseguir criar nossa primeira stream de leitura, mas para transformá-la em um exemplo mais aplicado a um cenário real de mercado, vamos aplicar um delay de 1s para o enviado dos dados, simulando o processo de leitura de um arquivo .csv de 1GB de clientes.
import { Readable } from 'node:stream'
class OneToHundredStream extends Readable {
index = 1
_read() {
const i = this.index++
setTimeout(() => {
if (i > 100) {
this.push(null)
} else {
const buf = Buffer.from(String(i))
this.push(buf)
}
}, 1_000)
}
}
new OneToHundredStream()
.pipe(process.stdout)$ node src/streams.js
12345678910111213141516171819202122...Conclusão
O principal benefício de trabalhar com streams, no cenário de um grande volume de dados, é a possibilidade de poder manipulá-los mesmo que ainda não tenha lido a informação por completo.