Em 1978, Doug McIlroy propôs uma metáfora que mudaria o Unix:
programas como filtros que leem da entrada padrão, transformam,
e escrevem na saída padrão. Combinados via pipes (|),
filtros pequenos compõem soluções complexas. cat
arquivo.txt | grep "erro" | sort | uniq -c não foi inventado;
foi a aplicação de uma ideia teórica que se tornou hábito de
milhões de pessoas. McIlroy chamou aqueles filtros de
coroutines (em referência direta a Conway). Com Pike e
Thompson elaborando o conceito em Plan 9 nos anos 1980, a metáfora
de pipeline saiu do shell e entrou no design de sistemas
concorrentes.
Os padrões deste conceito — worker pool, fan-out/fan-in, pipeline — são generalizações dessa intuição. Eles aparecem em virtualmente todo sistema concorrente bem-projetado: web servers, brokers de mensagens, ETL pipelines, batch processors, scrapers, pipelines de machine learning. Não importa se você está em Go com goroutines, em Java com Streams, em C# com TPL Dataflow, em Python com asyncio, em Rust com tokio: o vocabulário é o mesmo. Saber nomear a estrutura é meio caminho para implementá-la corretamente.
Este conceito catalogará cada padrão com nome, uso, trade-offs e implementação canônica nas três linguagens. O conceito 12 trata backpressure (que aparece como tema transversal aqui), e o conceito 13 trata cancelamento. Os três juntos formam o ferramental de design concorrente em produção.
Pré-requisito mental: você sabe o que é uma goroutine, uma Task ou uma corotina (conceitos 02–04); sabe o que é um channel/queue (conceito 06); sabe que estado compartilhado custa caro (conceito 08). Aqui montamos blocos com essas primitivas.
Producer-Consumer — o padrão fundador
Antes dos demais, o padrão mais elementar de concorrência: producer-consumer. Um produtor gera trabalho; um consumidor processa. Eles operam em paralelo via uma fila intermediária que tampona variações de taxa entre eles. O ganho: produtor e consumidor podem ter taxas diferentes momentaneamente, sem bloquear um ao outro.
// Pseudocódigo
queue = BoundedQueue(capacidade=100)
// Produtor (em sua goroutine/thread/task)
fn produtor() {
while temMaisTrabalho() {
item = gerarItem()
queue.colocar(item) // bloqueia se cheia
}
queue.fechar()
}
// Consumidor (em outra)
fn consumidor() {
while item = queue.tirar() { // bloqueia se vazia
processar(item)
}
}
A queue é o ponto central. Bounded (com capacidade limitada) é a escolha defensiva: se o produtor é mais rápido que o consumidor, ele bloqueia em vez de fazer a fila crescer indefinidamente — backpressure natural. Unbounded aceita qualquer crescimento, com risco de OOM se as taxas divergirem muito.
Producer-consumer é a base dos demais padrões. Worker pool é "1 produtor, N consumidores". Fan-out/fan-in é "1 produtor, N consumidores, agregados em 1 saída". Pipeline é "sequência de producer-consumers encadeados". A análise de cada um é, no fundo, análise de filas conectadas.
Worker pool — limitando concorrência total
Worker pool é a generalização de producer-consumer com múltiplos consumidores trabalhando em paralelo. Um produtor (ou vários) coloca itens em uma fila; N workers consomem dessa fila e processam. A propriedade central: no máximo N itens estão sendo processados ao mesmo tempo — independentemente de quantos itens estejam pendentes.
Casos de uso recorrentes em produção:
- Rate-limiting de API externa: você precisa chamar uma API que aceita no máximo 50 requisições simultâneas. Pool de 50 workers garante que o limite é respeitado, sem precisar de semáforo separado.
- Processamento de jobs em fila: jobs chegam de Redis, RabbitMQ ou Kafka; pool de workers processa N em paralelo. Adicionar/reduzir workers ajusta throughput sem mudar a fila.
- Web scraping: um pool limita carga no host remoto e respeita robots.txt na prática (cada worker é uma unidade de respeito).
- Connection pooling: pool de conexões a banco de dados é literalmente um worker pool (cada conexão é um worker; queries são "jobs").
O dimensionamento do pool é decisão crítica. Para CPU-bound, o ponto de partida é o número de cores físicos (não lógicos). Para I/O-bound, muito mais alto, limitado pela API/banco/memória. Sempre meça em vez de chutar — variar de 8 a 32 a 128 workers e observar throughput / latência tail é o caminho honesto. O conceito 14 fecha esse tópico.
Fan-out / Fan-in
Fan-out: um produtor gera trabalho que vai paralelamente para múltiplos consumidores. Cada item vai para algum consumidor (não para todos — isso seria broadcast, um padrão diferente). Fan-out é essencialmente o aspecto "saída" de um worker pool: um channel/queue compartilhada por vários workers.
Fan-in: múltiplos produtores escrevem em uma única saída. Um agregador consome de N fontes e produz um único stream. Útil quando você tem trabalho dividido entre vários workers e precisa juntar resultados.
Fan-out + fan-in juntos formam um padrão poderoso:
┌──→ worker 1 ──┐
input ─────┤ │
├──→ worker 2 ──┼──→ output
│ │
└──→ worker N ──┘
↑ ↑
fan-out fan-in
Aplicações típicas: imagens redimensionadas em paralelo, traduções de batch via API externa com paralelismo limitado, processamento de log que precisa de ordem global recombinada. Em Go, é a combinação mais natural de channels e goroutines — Ajmani publicou em 2014 o artigo canônico (Pipelines and cancellation) que codificou o padrão para a comunidade.
Pipeline — estágios encadeados
Pipeline é a generalização para múltiplos estágios sequenciais. Cada estágio recebe input de uma fila, processa, escreve em outra fila que alimenta o próximo estágio. O paralelismo acontece em duas dimensões: dentro de cada estágio (vários workers) e entre estágios (estágio 1 trabalhando no item N+1 enquanto estágio 2 trabalha no item N).
fonte ──→ [parse] ──→ [validar] ──→ [enriquecer] ──→ [persistir] ──→ saída
↑ ↑ ↑ ↑
estágio 1 estágio 2 estágio 3 estágio 4
(k workers) (m workers) (1 worker) (j workers)
A potência do pipeline está em três propriedades:
- Composição: cada estágio é função pequena e testável isoladamente. Mudar comportamento de um estágio não afeta os outros.
- Paralelismo entre estágios: enquanto estágio 1 parseia o item N+10, estágio 4 está persistindo o item N. O throughput total é limitado pelo estágio mais lento (gargalo), e identificar o gargalo é direto.
- Streaming: você não precisa do conjunto inteiro em memória. Itens fluem; ao terminar a fonte, o pipeline drena e termina. Funciona com volumes que não cabem em RAM.
Sistemas reais que são pipelines: ETL (extract-transform-load), processamento de imagens em câmeras digitais, compiladores (lex → parse → typecheck → optimize → emit), processadores de áudio em DAWs, frameworks de dados como Apache Beam e Spark Streaming.
O mesmo padrão nas três linguagens — worker pool
Para concretizar, o padrão worker pool é uma boa entrada. As três linguagens têm primitivas distintas, mas o pattern é reconhecidamente o mesmo.
using System.Threading.Channels;
async Task ProcessarJobs(IEnumerable<Job> jobs, int paraleloMax) {
var canal = Channel.CreateBounded<Job>(new BoundedChannelOptions(100) {
FullMode = BoundedChannelFullMode.Wait
});
// Produtor
_ = Task.Run(async () => {
foreach (var job in jobs) {
await canal.Writer.WriteAsync(job);
}
canal.Writer.Complete();
});
// Pool de workers via Parallel.ForEachAsync (mais simples que loops manuais)
await Parallel.ForEachAsync(
canal.Reader.ReadAllAsync(),
new ParallelOptions { MaxDegreeOfParallelism = paraleloMax },
async (job, ct) => {
await ProcessarUmJob(job, ct);
});
}
Parallel.ForEachAsync (.NET 6+) é a forma
idiomática moderna para worker pool em C#. Ele aceita
IAsyncEnumerable<T> (perfeito para
Channel.Reader.ReadAllAsync),
MaxDegreeOfParallelism dimensiona o pool, e
cancelamento propaga via CancellationToken.
Para casos onde você precisa controle mais explícito,
Task[] com Task.WhenAll e
consumidores manuais ainda é válido.
import asyncio
from typing import AsyncIterable
async def processar_jobs(
jobs: AsyncIterable[Job], paralelo_max: int
) -> None:
fila: asyncio.Queue[Job | None] = asyncio.Queue(maxsize=100)
async def produtor():
async for job in jobs:
await fila.put(job)
for _ in range(paralelo_max):
await fila.put(None) # sinaliza fim a cada worker
async def worker(idx: int):
while True:
job = await fila.get()
if job is None:
fila.task_done()
return
try:
await processar_um_job(job)
finally:
fila.task_done()
async with asyncio.TaskGroup() as tg:
tg.create_task(produtor())
for i in range(paralelo_max):
tg.create_task(worker(i))
asyncio.Queue com maxsize dá
backpressure. TaskGroup (Python 3.11+) garante
structured concurrency — se qualquer worker levanta, o grupo
inteiro é cancelado. Sentinela None sinaliza fim
(uma por worker, para todos receberem). Para casos com
erro estruturado, prefira asyncio.timeout e
deixe TaskGroup propagar ExceptionGroup.
package main
import (
"context"
"golang.org/x/sync/errgroup"
)
func ProcessarJobs(ctx context.Context, jobs <-chan Job, paraleloMax int) error {
g, ctx := errgroup.WithContext(ctx)
for i := 0; i < paraleloMax; i++ {
g.Go(func() error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case job, ok := <-jobs:
if !ok {
return nil
}
if err := processarUmJob(ctx, job); err != nil {
return err
}
}
}
})
}
return g.Wait()
}
Channel buffered como entrada do pool é o idiomático. Cada
worker é uma goroutine que lê do channel até ele fechar.
errgroup dá structured concurrency: primeiro
erro cancela ctx, outros workers veem o
cancelamento via ctx.Done() e terminam. O
produtor (não mostrado) deve fechar jobs para
os workers saberem que acabou.
Pipeline em Go — exemplo canônico
Pipeline merece exemplo dedicado pela quantidade de detalhes sutis. O exemplo abaixo, em Go, segue de perto o padrão apresentado por Sameer Ajmani no blog oficial em 2014 — referência seminal para qualquer um aprendendo concorrência em Go.
package main
import (
"context"
"fmt"
"sync"
)
// Estágio 1: gerar números 1..N
func gerar(ctx context.Context, n int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for i := 1; i <= n; i++ {
select {
case <-ctx.Done():
return
case out <- i:
}
}
}()
return out
}
// Estágio 2: elevar ao quadrado (com paralelismo via fan-out)
func elevar(ctx context.Context, in <-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
const workers = 4
for w := 0; w < workers; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for v := range in {
select {
case <-ctx.Done():
return
case out <- v * v:
}
}
}()
}
go func() {
wg.Wait() // espera todos os workers
close(out) // só então fecha
}()
return out
}
// Estágio 3: somar tudo (single worker pra preservar ordem de soma)
func somar(ctx context.Context, in <-chan int) int {
total := 0
for v := range in {
total += v
}
return total
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nums := gerar(ctx, 100)
quadrados := elevar(ctx, nums)
soma := somar(ctx, quadrados)
fmt.Println(soma) // 338350
}
Pontos a observar. Cada estágio recebe context e
respeita cancelamento via select. Cada estágio
retorna um channel que ele mesmo fecha quando termina (via
defer close(out) ou goroutine separada esperando
WaitGroup). O estágio "elevar" tem fan-out
interno (4 workers paralelos), depois fan-in implícito (todos
escrevem em out); o canal só é fechado depois que
todos os workers terminam — daí o WaitGroup
separado.
Se algum estágio tiver erro, o cancel() propaga
via context, todos os estágios saem dos seus
select, fecham seus canais, e o pipeline drena
limpo. Esse é o padrão que escala para pipelines reais com
muitos estágios.
Em pipeline, a regra mais importante: quem cria o canal é quem fecha o canal. Se um estágio recebe um canal externo, ele nunca fecha esse canal — só consome. Fechamento duplo é panic em Go; em outras linguagens, é bug sutil. A convenção elimina ambiguidade: cada channel tem dono claro de fim de vida.
Generator pattern — o produtor "preguiçoso"
Variação do producer-consumer onde o produtor gera valores sob demanda em vez de antecipadamente. Útil quando a fonte é infinita (stream sem fim) ou cara (gerar item custa computação). O consumidor "puxa" itens conforme precisa; o produtor avança somente quando há demanda.
Em Go, o pattern é uma goroutine produtora que escreve em um
channel não bufferizado — ela bloqueia até o consumidor ler.
Em Python, generator com yield faz isso
naturalmente (visto no conceito 03). Em C#, IAsyncEnumerable
com yield return async oferece equivalente.
// Go — gerador infinito de números primos
func primos(ctx context.Context) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := 2; ; n++ {
if ehPrimo(n) {
select {
case <-ctx.Done():
return
case out <- n:
}
}
}
}()
return out
}
Generator é apropriado quando o consumidor decide quando parar
(range com break, take(N)) ou quando o
produtor é caro o suficiente que gerar tudo antecipadamente
seria desperdício.
Scatter-gather
Pattern parecido com fan-out/fan-in, mas com semântica diferente: scatter envia cópias da mesma requisição para múltiplos backends; gather coleta as respostas e produz uma resposta agregada (a primeira, a mais rápida, todas). Common em sistemas distribuídos: consultar 3 réplicas de DB e retornar a primeira resposta válida; chamar 3 APIs de tradução em paralelo e usar a melhor; broadcast a vários microsserviços de busca e mesclar resultados.
Em Go, select com vários cases de receive
implementa scatter-gather "first wins" diretamente. Em Python
asyncio, asyncio.wait com
FIRST_COMPLETED. Em C#, Task.WhenAny.
Quando a aplicação é "todas precisam responder", usa-se
WaitGroup/asyncio.gather/Task.WhenAll
em vez. A diferença em uma palavra: WhenAny vs
WhenAll.
Fan-out controlado por semáforo
Variação clássica: você quer fan-out, mas com limite de concorrência. Em vez de um pool de workers consumindo de channel, dispara N tarefas mas com semáforo controlando quantas executam ao mesmo tempo. Uso típico: você tem 10.000 URLs, quer todas processadas, mas só 50 requisições simultâneas ao remoto.
// Python
import asyncio
sem = asyncio.Semaphore(50)
async def buscar(url):
async with sem:
return await aiohttp_get(url)
# Dispara 10000 tarefas; só 50 executam de fato em qualquer momento
async with asyncio.TaskGroup() as tg:
for url in todas_urls:
tg.create_task(buscar(url))
Vantagem sobre worker pool tradicional: cada tarefa preserva sua identidade (e contexto). Worker pool com fila genérica perde a "individualidade" de cada job — se você precisa saber qual job estourou exceção, fica mais difícil. Com fan-out via semáforo, cada task é independente e tem seu próprio escopo.
Stages com diferente paralelismo
Em pipeline real, estágios diferentes tipicamente têm paralelismo apropriado diferente. Estágio que faz network I/O pode ter 100 workers; estágio que faz CPU intenso de compressão deve ter 4 (= cores físicos); estágio que escreve em banco deve ter o tamanho do connection pool. Forçar paralelismo uniforme é um erro comum.
A regra prática: dimensione cada estágio independentemente pela natureza de seu trabalho. Se você tem pipeline com buffer entre estágios (channel/queue bounded), o estágio mais lento determina throughput total — esse é o gargalo. Adicionar workers em estágios não-gargalo só desperdiça recursos. Profilar e identificar onde a fila enche revela o gargalo.
Erro em pipelines — propagação correta
Um problema sutil em pipelines: como propagar erro de um estágio até o caller? Três abordagens, em ordem de sofisticação:
Erro como item no canal
O canal carrega (valor, erro) tuples ou um
Result<T,E>. Cada estágio repassa erros sem
processar. O consumidor final lida. Funciona, mas a sintaxe
fica verbose e o trecho "feliz" do código é poluído por
checks de erro.
Cancelamento via context
Erro em qualquer estágio chama cancel() do
contexto compartilhado; outros estágios veem o cancelamento
e terminam graciosamente; o caller recebe via
g.Wait() ou equivalente. É a forma idiomática
em Go com errgroup.WithContext. Limpo,
composto, idiomático.
Estruturas de stream com erro built-in
Bibliotecas modernas (Reactive Streams, Akka Streams, RxJava, Reactor, .NET TPL Dataflow) tratam erro como cidadão de primeira classe — Subscriber.onError, exceptions agregados, retry policies declarativas. Mais sofisticado, com mais curva de aprendizagem.
Pipeline sem cancelamento adequado causa goroutine
leak em Go (e equivalentes em outras linguagens).
Quando o consumidor final aborta cedo, estágios anteriores
ficam bloqueados tentando escrever em canal que ninguém lê.
Resultado: goroutines vivas para sempre, memória crescendo,
bug invisível em laboratório que aparece em produção sob
carga. Sempre propague context/CancellationToken
em todos os estágios e respeite cancelamento em
todo select/await.
Quando padrões dão errado
Padrões são abstrações úteis, mas aplicá-los onde não cabem é complicação inútil. Alguns sinais de que você está força-aplicando:
Worker pool para 5 itens
Se você tem N tarefas onde N é pequeno e conhecido, e cada
uma é independente, Task.WhenAll/asyncio.gather/errgroup
sem fila resolve. Worker pool com fila adiciona código sem
ganho.
Pipeline para um único estágio
Se a transformação cabe em uma função, escreva uma função. Pipeline existe para compor; sem composição, é só ceremônia.
Fan-out para tarefa muito barata
Se cada item leva microssegundos para processar, o overhead de coordenação concorrente (criação de task, sincronização, coordenação) supera o ganho de paralelismo. Loop sequencial é melhor.
Pool muito pequeno em I/O-bound
Pool de 4 workers para chamar API HTTP é desperdício. Workers ficam bloqueados em network 95% do tempo; aumentar para 100 ou 1000 quase não custa memória (especialmente em modelos cooperativos como asyncio/goroutines). Dimensione por natureza do trabalho, não por intuição.
Como praticar
- Implemente um pipeline de scraping. Três estágios: fetch (network, paralelismo alto), parse (CPU, paralelismo = cores), persist (banco, paralelismo = pool). Use channels (Go) ou Queue (Python) ou Channel (C#) entre estágios. Cada estágio respeita context/cancellation. Documente o gargalo após rodar contra dataset real.
- Experimente fan-out com diferentes tamanhos de pool. Pegue trabalho I/O-bound (1000 chamadas HTTP a endpoint lento) e CPU-bound (calcular SHA-256 de 1000 arquivos). Para cada um, varie o pool de 1, 4, 16, 64, 256 workers. Plote tempo total e CPU usage. Vai descobrir que as duas curvas têm formas radicalmente diferentes.
-
Force um goroutine leak e diagnostique. Em
Go, escreva um pipeline simples sem propagar context.
Faça o consumidor final retornar cedo (depois de receber
primeiro item). Use
runtime.NumGoroutine()ou endpoint/debug/pprof/goroutine?debug=2para observar goroutines vivas. Adicionecontextend-to-end e veja o leak desaparecer.
Referências para aprofundar
- livro Concurrency in Go — Katherine Cox-Buday (2017).
- livro Concurrent Programming on Windows — Joe Duffy (2008).
- livro Designing Data-Intensive Applications — Martin Kleppmann (2017).
- livro Java Concurrency in Practice — Brian Goetz et al. (2006).
- artigo Go Concurrency Patterns: Pipelines and cancellation — Sameer Ajmani (2014).
- artigo Advanced Go Concurrency Patterns — Sameer Ajmani (Google I/O 2013).
- artigo Patterns for Concurrent Programming — Doug Lea (1999, atualizado).
- artigo The Pyramid of Parallelism — Joe Duffy.
- docs System.Threading.Channels (.NET docs).
- docs TPL Dataflow Library.
- docs Reactive Streams Specification.
- vídeo Go Concurrency Patterns — Rob Pike (Google I/O 2012).