Criando uma Readable Stream
Para criar uma readable stream, é necessário implementar o método _read
da classe Readable
do módulo node:stream
. Esse método é o responsável por realizar a operação de leitura em uma stream.
Por exemplo, considere o exemplo abaixo de implementação de uma stream de leitura.
import { Readable } from 'node:stream'
class OneToHundredStream extends Readable {
index = 1
_read() {
const i = this.index++
if (i > 100) {
this.push(null)
} else {
this.push(i)
}
}
}
new OneToHundredStream()
.pipe(process.stdout)
Note
O método
push
serve para uma readable stream fornecer as informações a serem consumidas. Se o valor especificado como argumento desse método pornull
, isso quer dizer que não mais dados para ser enviado para o consumidor (fim da stream).
$ node src/streams.js
TypeError [ERR_INVALID_ARG_TYPE]: The "chunk" argument must be of type string or an instance of Buffer or Uint8Array. Received type number (1)
O Node utiliza do mecanismo de chunk em streams para enviar pedaços de dados e para isso, não é possível utilizar tipos primitivos diretamente para enviá-los em lotes (uma vez que essa lógica não é desse tipo de dado), para isso o Node trabalha com o formato de buffer.
import { Readable } from 'node:stream'
class OneToHundredStream extends Readable {
index = 1
_read() {
const i = this.index++
if (i > 100) {
this.push(null)
} else {
const buf = Buffer.from(String(i))
this.push(buf)
}
}
}
new OneToHundredStream()
.pipe(process.stdout)
Note
O buffer permite apenas receber dados do tipo
string
, então caso esteja trabalhando com outro formato comonumber
é preciso convertê-lo parastring
.
$ node src/streams.js
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100%
Com isso, conseguir criar nossa primeira stream de leitura, mas para transformá-la em um exemplo mais aplicado a um cenário real de mercado, vamos aplicar um delay de 1s para o enviado dos dados, simulando o processo de leitura de um arquivo .csv
de 1GB de clientes.
import { Readable } from 'node:stream'
class OneToHundredStream extends Readable {
index = 1
_read() {
const i = this.index++
setTimeout(() => {
if (i > 100) {
this.push(null)
} else {
const buf = Buffer.from(String(i))
this.push(buf)
}
}, 1_000)
}
}
new OneToHundredStream()
.pipe(process.stdout)
$ node src/streams.js
12345678910111213141516171819202122...
Conclusão
O principal benefício de trabalhar com streams, no cenário de um grande volume de dados, é a possibilidade de poder manipulá-los mesmo que ainda não tenha lido a informação por completo.