Criando uma Readable Stream


Para criar uma readable stream, é necessário implementar o método _read da classe Readable do módulo node:stream. Esse método é o responsável por realizar a operação de leitura em uma stream.

Por exemplo, considere o exemplo abaixo de implementação de uma stream de leitura.

import { Readable } from 'node:stream'
 
class OneToHundredStream extends Readable {
    index = 1
 
    _read() {
        const i = this.index++
 
        if (i > 100) {
            this.push(null)
        } else {
            this.push(i)
        }
    }
}
 
new OneToHundredStream()
    .pipe(process.stdout)

Note

O método push serve para uma readable stream fornecer as informações a serem consumidas. Se o valor especificado como argumento desse método por null, isso quer dizer que não mais dados para ser enviado para o consumidor (fim da stream).

$ node src/streams.js
 
TypeError [ERR_INVALID_ARG_TYPE]: The "chunk" argument must be of type string or an instance of Buffer or Uint8Array. Received type number (1)

O Node utiliza do mecanismo de chunk em streams para enviar pedaços de dados e para isso, não é possível utilizar tipos primitivos diretamente para enviá-los em lotes (uma vez que essa lógica não é desse tipo de dado), para isso o Node trabalha com o formato de buffer.

import { Readable } from 'node:stream'
 
class OneToHundredStream extends Readable {
    index = 1
 
    _read() {
        const i = this.index++
 
        if (i > 100) {
            this.push(null)
        } else {
            const buf = Buffer.from(String(i))
            this.push(buf)
        }
    }
}
 
new OneToHundredStream()
    .pipe(process.stdout)

Note

O buffer permite apenas receber dados do tipo string, então caso esteja trabalhando com outro formato como number é preciso convertê-lo para string.

$ node src/streams.js
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100%  

Com isso, conseguir criar nossa primeira stream de leitura, mas para transformá-la em um exemplo mais aplicado a um cenário real de mercado, vamos aplicar um delay de 1s para o enviado dos dados, simulando o processo de leitura de um arquivo .csv de 1GB de clientes.

import { Readable } from 'node:stream'
 
class OneToHundredStream extends Readable {
    index = 1
 
    _read() {
        const i = this.index++
 
        setTimeout(() => {
            if (i > 100) {
                this.push(null)
            } else {
                const buf = Buffer.from(String(i))
                this.push(buf)
            }
        }, 1_000)
    }
}
 
new OneToHundredStream()
    .pipe(process.stdout)
$ node src/streams.js
12345678910111213141516171819202122...

Conclusão

O principal benefício de trabalhar com streams, no cenário de um grande volume de dados, é a possibilidade de poder manipulá-los mesmo que ainda não tenha lido a informação por completo.

Referências