MÓDULO 04 · CONCEITO 11 DE 14

Padrões: worker pool, fan-out/fan-in, pipeline

Vocabulário canônico de design concorrente. Pool de workers, distribuição e agregação, pipeline com estágios — abstrações reutilizáveis em qualquer modelo.

Tempo de leitura ~22 min Pré-requisito Conceitos 04, 05, 06 Próximo Backpressure e flow control

Em 1978, Doug McIlroy propôs uma metáfora que mudaria o Unix: programas como filtros que leem da entrada padrão, transformam, e escrevem na saída padrão. Combinados via pipes (|), filtros pequenos compõem soluções complexas. cat arquivo.txt | grep "erro" | sort | uniq -c não foi inventado; foi a aplicação de uma ideia teórica que se tornou hábito de milhões de pessoas. McIlroy chamou aqueles filtros de coroutines (em referência direta a Conway). Com Pike e Thompson elaborando o conceito em Plan 9 nos anos 1980, a metáfora de pipeline saiu do shell e entrou no design de sistemas concorrentes.

Os padrões deste conceito — worker pool, fan-out/fan-in, pipeline — são generalizações dessa intuição. Eles aparecem em virtualmente todo sistema concorrente bem-projetado: web servers, brokers de mensagens, ETL pipelines, batch processors, scrapers, pipelines de machine learning. Não importa se você está em Go com goroutines, em Java com Streams, em C# com TPL Dataflow, em Python com asyncio, em Rust com tokio: o vocabulário é o mesmo. Saber nomear a estrutura é meio caminho para implementá-la corretamente.

Este conceito catalogará cada padrão com nome, uso, trade-offs e implementação canônica nas três linguagens. O conceito 12 trata backpressure (que aparece como tema transversal aqui), e o conceito 13 trata cancelamento. Os três juntos formam o ferramental de design concorrente em produção.

Pré-requisito mental: você sabe o que é uma goroutine, uma Task ou uma corotina (conceitos 02–04); sabe o que é um channel/queue (conceito 06); sabe que estado compartilhado custa caro (conceito 08). Aqui montamos blocos com essas primitivas.

Producer-Consumer — o padrão fundador

Antes dos demais, o padrão mais elementar de concorrência: producer-consumer. Um produtor gera trabalho; um consumidor processa. Eles operam em paralelo via uma fila intermediária que tampona variações de taxa entre eles. O ganho: produtor e consumidor podem ter taxas diferentes momentaneamente, sem bloquear um ao outro.

// Pseudocódigo
queue = BoundedQueue(capacidade=100)

// Produtor (em sua goroutine/thread/task)
fn produtor() {
    while temMaisTrabalho() {
        item = gerarItem()
        queue.colocar(item)   // bloqueia se cheia
    }
    queue.fechar()
}

// Consumidor (em outra)
fn consumidor() {
    while item = queue.tirar() {  // bloqueia se vazia
        processar(item)
    }
}

A queue é o ponto central. Bounded (com capacidade limitada) é a escolha defensiva: se o produtor é mais rápido que o consumidor, ele bloqueia em vez de fazer a fila crescer indefinidamente — backpressure natural. Unbounded aceita qualquer crescimento, com risco de OOM se as taxas divergirem muito.

Producer-consumer é a base dos demais padrões. Worker pool é "1 produtor, N consumidores". Fan-out/fan-in é "1 produtor, N consumidores, agregados em 1 saída". Pipeline é "sequência de producer-consumers encadeados". A análise de cada um é, no fundo, análise de filas conectadas.

Worker pool — limitando concorrência total

Worker pool é a generalização de producer-consumer com múltiplos consumidores trabalhando em paralelo. Um produtor (ou vários) coloca itens em uma fila; N workers consomem dessa fila e processam. A propriedade central: no máximo N itens estão sendo processados ao mesmo tempo — independentemente de quantos itens estejam pendentes.

Casos de uso recorrentes em produção:

O dimensionamento do pool é decisão crítica. Para CPU-bound, o ponto de partida é o número de cores físicos (não lógicos). Para I/O-bound, muito mais alto, limitado pela API/banco/memória. Sempre meça em vez de chutar — variar de 8 a 32 a 128 workers e observar throughput / latência tail é o caminho honesto. O conceito 14 fecha esse tópico.

Fan-out / Fan-in

Fan-out: um produtor gera trabalho que vai paralelamente para múltiplos consumidores. Cada item vai para algum consumidor (não para todos — isso seria broadcast, um padrão diferente). Fan-out é essencialmente o aspecto "saída" de um worker pool: um channel/queue compartilhada por vários workers.

Fan-in: múltiplos produtores escrevem em uma única saída. Um agregador consome de N fontes e produz um único stream. Útil quando você tem trabalho dividido entre vários workers e precisa juntar resultados.

Fan-out + fan-in juntos formam um padrão poderoso:

            ┌──→ worker 1 ──┐
input ─────┤                │
            ├──→ worker 2 ──┼──→ output
            │                │
            └──→ worker N ──┘
            ↑                ↑
          fan-out          fan-in

Aplicações típicas: imagens redimensionadas em paralelo, traduções de batch via API externa com paralelismo limitado, processamento de log que precisa de ordem global recombinada. Em Go, é a combinação mais natural de channels e goroutines — Ajmani publicou em 2014 o artigo canônico (Pipelines and cancellation) que codificou o padrão para a comunidade.

Pipeline — estágios encadeados

Pipeline é a generalização para múltiplos estágios sequenciais. Cada estágio recebe input de uma fila, processa, escreve em outra fila que alimenta o próximo estágio. O paralelismo acontece em duas dimensões: dentro de cada estágio (vários workers) e entre estágios (estágio 1 trabalhando no item N+1 enquanto estágio 2 trabalha no item N).

fonte ──→ [parse] ──→ [validar] ──→ [enriquecer] ──→ [persistir] ──→ saída
            ↑              ↑                ↑                ↑
         estágio 1      estágio 2        estágio 3        estágio 4
       (k workers)    (m workers)       (1 worker)       (j workers)

A potência do pipeline está em três propriedades:

Sistemas reais que são pipelines: ETL (extract-transform-load), processamento de imagens em câmeras digitais, compiladores (lex → parse → typecheck → optimize → emit), processadores de áudio em DAWs, frameworks de dados como Apache Beam e Spark Streaming.

O mesmo padrão nas três linguagens — worker pool

Para concretizar, o padrão worker pool é uma boa entrada. As três linguagens têm primitivas distintas, mas o pattern é reconhecidamente o mesmo.

C# — System.Threading.Channels + Parallel.ForEachAsync
using System.Threading.Channels;

async Task ProcessarJobs(IEnumerable<Job> jobs, int paraleloMax) {
    var canal = Channel.CreateBounded<Job>(new BoundedChannelOptions(100) {
        FullMode = BoundedChannelFullMode.Wait
    });

    // Produtor
    _ = Task.Run(async () => {
        foreach (var job in jobs) {
            await canal.Writer.WriteAsync(job);
        }
        canal.Writer.Complete();
    });

    // Pool de workers via Parallel.ForEachAsync (mais simples que loops manuais)
    await Parallel.ForEachAsync(
        canal.Reader.ReadAllAsync(),
        new ParallelOptions { MaxDegreeOfParallelism = paraleloMax },
        async (job, ct) => {
            await ProcessarUmJob(job, ct);
        });
}

Parallel.ForEachAsync (.NET 6+) é a forma idiomática moderna para worker pool em C#. Ele aceita IAsyncEnumerable<T> (perfeito para Channel.Reader.ReadAllAsync), MaxDegreeOfParallelism dimensiona o pool, e cancelamento propaga via CancellationToken. Para casos onde você precisa controle mais explícito, Task[] com Task.WhenAll e consumidores manuais ainda é válido.

Python — asyncio.Queue + TaskGroup
import asyncio
from typing import AsyncIterable

async def processar_jobs(
    jobs: AsyncIterable[Job], paralelo_max: int
) -> None:
    fila: asyncio.Queue[Job | None] = asyncio.Queue(maxsize=100)

    async def produtor():
        async for job in jobs:
            await fila.put(job)
        for _ in range(paralelo_max):
            await fila.put(None)         # sinaliza fim a cada worker

    async def worker(idx: int):
        while True:
            job = await fila.get()
            if job is None:
                fila.task_done()
                return
            try:
                await processar_um_job(job)
            finally:
                fila.task_done()

    async with asyncio.TaskGroup() as tg:
        tg.create_task(produtor())
        for i in range(paralelo_max):
            tg.create_task(worker(i))

asyncio.Queue com maxsize dá backpressure. TaskGroup (Python 3.11+) garante structured concurrency — se qualquer worker levanta, o grupo inteiro é cancelado. Sentinela None sinaliza fim (uma por worker, para todos receberem). Para casos com erro estruturado, prefira asyncio.timeout e deixe TaskGroup propagar ExceptionGroup.

Go — goroutines + channel + errgroup
package main

import (
    "context"
    "golang.org/x/sync/errgroup"
)

func ProcessarJobs(ctx context.Context, jobs <-chan Job, paraleloMax int) error {
    g, ctx := errgroup.WithContext(ctx)

    for i := 0; i < paraleloMax; i++ {
        g.Go(func() error {
            for {
                select {
                case <-ctx.Done():
                    return ctx.Err()
                case job, ok := <-jobs:
                    if !ok {
                        return nil
                    }
                    if err := processarUmJob(ctx, job); err != nil {
                        return err
                    }
                }
            }
        })
    }
    return g.Wait()
}

Channel buffered como entrada do pool é o idiomático. Cada worker é uma goroutine que lê do channel até ele fechar. errgroup dá structured concurrency: primeiro erro cancela ctx, outros workers veem o cancelamento via ctx.Done() e terminam. O produtor (não mostrado) deve fechar jobs para os workers saberem que acabou.

Pipeline em Go — exemplo canônico

Pipeline merece exemplo dedicado pela quantidade de detalhes sutis. O exemplo abaixo, em Go, segue de perto o padrão apresentado por Sameer Ajmani no blog oficial em 2014 — referência seminal para qualquer um aprendendo concorrência em Go.

package main

import (
    "context"
    "fmt"
    "sync"
)

// Estágio 1: gerar números 1..N
func gerar(ctx context.Context, n int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for i := 1; i <= n; i++ {
            select {
            case <-ctx.Done():
                return
            case out <- i:
            }
        }
    }()
    return out
}

// Estágio 2: elevar ao quadrado (com paralelismo via fan-out)
func elevar(ctx context.Context, in <-chan int) <-chan int {
    out := make(chan int)
    var wg sync.WaitGroup
    const workers = 4

    for w := 0; w < workers; w++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for v := range in {
                select {
                case <-ctx.Done():
                    return
                case out <- v * v:
                }
            }
        }()
    }
    go func() {
        wg.Wait()      // espera todos os workers
        close(out)     // só então fecha
    }()
    return out
}

// Estágio 3: somar tudo (single worker pra preservar ordem de soma)
func somar(ctx context.Context, in <-chan int) int {
    total := 0
    for v := range in {
        total += v
    }
    return total
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    nums := gerar(ctx, 100)
    quadrados := elevar(ctx, nums)
    soma := somar(ctx, quadrados)
    fmt.Println(soma)   // 338350
}

Pontos a observar. Cada estágio recebe context e respeita cancelamento via select. Cada estágio retorna um channel que ele mesmo fecha quando termina (via defer close(out) ou goroutine separada esperando WaitGroup). O estágio "elevar" tem fan-out interno (4 workers paralelos), depois fan-in implícito (todos escrevem em out); o canal só é fechado depois que todos os workers terminam — daí o WaitGroup separado.

Se algum estágio tiver erro, o cancel() propaga via context, todos os estágios saem dos seus select, fecham seus canais, e o pipeline drena limpo. Esse é o padrão que escala para pipelines reais com muitos estágios.

princípio orientador

Em pipeline, a regra mais importante: quem cria o canal é quem fecha o canal. Se um estágio recebe um canal externo, ele nunca fecha esse canal — só consome. Fechamento duplo é panic em Go; em outras linguagens, é bug sutil. A convenção elimina ambiguidade: cada channel tem dono claro de fim de vida.

Generator pattern — o produtor "preguiçoso"

Variação do producer-consumer onde o produtor gera valores sob demanda em vez de antecipadamente. Útil quando a fonte é infinita (stream sem fim) ou cara (gerar item custa computação). O consumidor "puxa" itens conforme precisa; o produtor avança somente quando há demanda.

Em Go, o pattern é uma goroutine produtora que escreve em um channel não bufferizado — ela bloqueia até o consumidor ler. Em Python, generator com yield faz isso naturalmente (visto no conceito 03). Em C#, IAsyncEnumerable com yield return async oferece equivalente.

// Go — gerador infinito de números primos
func primos(ctx context.Context) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := 2; ; n++ {
            if ehPrimo(n) {
                select {
                case <-ctx.Done():
                    return
                case out <- n:
                }
            }
        }
    }()
    return out
}

Generator é apropriado quando o consumidor decide quando parar (range com break, take(N)) ou quando o produtor é caro o suficiente que gerar tudo antecipadamente seria desperdício.

Scatter-gather

Pattern parecido com fan-out/fan-in, mas com semântica diferente: scatter envia cópias da mesma requisição para múltiplos backends; gather coleta as respostas e produz uma resposta agregada (a primeira, a mais rápida, todas). Common em sistemas distribuídos: consultar 3 réplicas de DB e retornar a primeira resposta válida; chamar 3 APIs de tradução em paralelo e usar a melhor; broadcast a vários microsserviços de busca e mesclar resultados.

Em Go, select com vários cases de receive implementa scatter-gather "first wins" diretamente. Em Python asyncio, asyncio.wait com FIRST_COMPLETED. Em C#, Task.WhenAny. Quando a aplicação é "todas precisam responder", usa-se WaitGroup/asyncio.gather/Task.WhenAll em vez. A diferença em uma palavra: WhenAny vs WhenAll.

Fan-out controlado por semáforo

Variação clássica: você quer fan-out, mas com limite de concorrência. Em vez de um pool de workers consumindo de channel, dispara N tarefas mas com semáforo controlando quantas executam ao mesmo tempo. Uso típico: você tem 10.000 URLs, quer todas processadas, mas só 50 requisições simultâneas ao remoto.

// Python
import asyncio
sem = asyncio.Semaphore(50)

async def buscar(url):
    async with sem:
        return await aiohttp_get(url)

# Dispara 10000 tarefas; só 50 executam de fato em qualquer momento
async with asyncio.TaskGroup() as tg:
    for url in todas_urls:
        tg.create_task(buscar(url))

Vantagem sobre worker pool tradicional: cada tarefa preserva sua identidade (e contexto). Worker pool com fila genérica perde a "individualidade" de cada job — se você precisa saber qual job estourou exceção, fica mais difícil. Com fan-out via semáforo, cada task é independente e tem seu próprio escopo.

Stages com diferente paralelismo

Em pipeline real, estágios diferentes tipicamente têm paralelismo apropriado diferente. Estágio que faz network I/O pode ter 100 workers; estágio que faz CPU intenso de compressão deve ter 4 (= cores físicos); estágio que escreve em banco deve ter o tamanho do connection pool. Forçar paralelismo uniforme é um erro comum.

A regra prática: dimensione cada estágio independentemente pela natureza de seu trabalho. Se você tem pipeline com buffer entre estágios (channel/queue bounded), o estágio mais lento determina throughput total — esse é o gargalo. Adicionar workers em estágios não-gargalo só desperdiça recursos. Profilar e identificar onde a fila enche revela o gargalo.

Erro em pipelines — propagação correta

Um problema sutil em pipelines: como propagar erro de um estágio até o caller? Três abordagens, em ordem de sofisticação:

Erro como item no canal

O canal carrega (valor, erro) tuples ou um Result<T,E>. Cada estágio repassa erros sem processar. O consumidor final lida. Funciona, mas a sintaxe fica verbose e o trecho "feliz" do código é poluído por checks de erro.

Cancelamento via context

Erro em qualquer estágio chama cancel() do contexto compartilhado; outros estágios veem o cancelamento e terminam graciosamente; o caller recebe via g.Wait() ou equivalente. É a forma idiomática em Go com errgroup.WithContext. Limpo, composto, idiomático.

Estruturas de stream com erro built-in

Bibliotecas modernas (Reactive Streams, Akka Streams, RxJava, Reactor, .NET TPL Dataflow) tratam erro como cidadão de primeira classe — Subscriber.onError, exceptions agregados, retry policies declarativas. Mais sofisticado, com mais curva de aprendizagem.

armadilha clássica

Pipeline sem cancelamento adequado causa goroutine leak em Go (e equivalentes em outras linguagens). Quando o consumidor final aborta cedo, estágios anteriores ficam bloqueados tentando escrever em canal que ninguém lê. Resultado: goroutines vivas para sempre, memória crescendo, bug invisível em laboratório que aparece em produção sob carga. Sempre propague context/CancellationToken em todos os estágios e respeite cancelamento em todo select/await.

Quando padrões dão errado

Padrões são abstrações úteis, mas aplicá-los onde não cabem é complicação inútil. Alguns sinais de que você está força-aplicando:

Worker pool para 5 itens

Se você tem N tarefas onde N é pequeno e conhecido, e cada uma é independente, Task.WhenAll/asyncio.gather/errgroup sem fila resolve. Worker pool com fila adiciona código sem ganho.

Pipeline para um único estágio

Se a transformação cabe em uma função, escreva uma função. Pipeline existe para compor; sem composição, é só ceremônia.

Fan-out para tarefa muito barata

Se cada item leva microssegundos para processar, o overhead de coordenação concorrente (criação de task, sincronização, coordenação) supera o ganho de paralelismo. Loop sequencial é melhor.

Pool muito pequeno em I/O-bound

Pool de 4 workers para chamar API HTTP é desperdício. Workers ficam bloqueados em network 95% do tempo; aumentar para 100 ou 1000 quase não custa memória (especialmente em modelos cooperativos como asyncio/goroutines). Dimensione por natureza do trabalho, não por intuição.

Como praticar

  1. Implemente um pipeline de scraping. Três estágios: fetch (network, paralelismo alto), parse (CPU, paralelismo = cores), persist (banco, paralelismo = pool). Use channels (Go) ou Queue (Python) ou Channel (C#) entre estágios. Cada estágio respeita context/cancellation. Documente o gargalo após rodar contra dataset real.
  2. Experimente fan-out com diferentes tamanhos de pool. Pegue trabalho I/O-bound (1000 chamadas HTTP a endpoint lento) e CPU-bound (calcular SHA-256 de 1000 arquivos). Para cada um, varie o pool de 1, 4, 16, 64, 256 workers. Plote tempo total e CPU usage. Vai descobrir que as duas curvas têm formas radicalmente diferentes.
  3. Force um goroutine leak e diagnostique. Em Go, escreva um pipeline simples sem propagar context. Faça o consumidor final retornar cedo (depois de receber primeiro item). Use runtime.NumGoroutine() ou endpoint /debug/pprof/goroutine?debug=2 para observar goroutines vivas. Adicione context end-to-end e veja o leak desaparecer.

Referências para aprofundar

  1. livro Concurrency in Go — Katherine Cox-Buday (2017). Caps. 4 e 5 catalogam padrões CSP em Go com exemplos prontos para produção. Pipeline, fan-out, fan-in, error handling — tudo coberto.
  2. livro Concurrent Programming on Windows — Joe Duffy (2008). Datado em alguns detalhes mas conceitualmente firme. Caps. sobre TPL e Dataflow apresentam padrões de forma sistemática.
  3. livro Designing Data-Intensive Applications — Martin Kleppmann (2017). Cap. 11 (Stream Processing) coloca pipeline em escala distribuída. Após dominar este conceito, leia para ver como tudo escala.
  4. livro Java Concurrency in Practice — Brian Goetz et al. (2006). Caps. 6 (Task Execution) e 8 (Applying Thread Pools) cobrem worker pool com profundidade rara, em Java mas universalmente aplicável.
  5. artigo Go Concurrency Patterns: Pipelines and cancellation — Sameer Ajmani (2014). go.dev/blog/pipelines — O artigo canônico que codificou o padrão pipeline em Go. Lê-se em meia hora; diz tudo sobre como fazer certo.
  6. artigo Advanced Go Concurrency Patterns — Sameer Ajmani (Google I/O 2013). go.dev/talks/2013/advconc.slide — Slides da palestra clássica. Padrões idiomáticos resolvendo problemas reais (subscription com retry, naming).
  7. artigo Patterns for Concurrent Programming — Doug Lea (1999, atualizado). gee.cs.oswego.edu/dl/cpj — Doug Lea catalogou padrões décadas antes de virarem mainstream. Influência direta em java.util.concurrent.
  8. artigo The Pyramid of Parallelism — Joe Duffy. joeduffyblog.com — Hierarquia de abstrações de paralelismo: thread → task → dataflow → reactive. Útil para situar onde cada padrão se encaixa.
  9. docs System.Threading.Channels (.NET docs). learn.microsoft.com/en-us/dotnet/api/system.threading.channels — Documentação oficial. Channel.CreateBounded, options, idiomatic usage.
  10. docs TPL Dataflow Library. learn.microsoft.com/en-us/dotnet/standard/parallel-programming/dataflow-task-parallel-library — Framework declarativo para pipelines em .NET. Estágios reusáveis, propagação de erro built-in.
  11. docs Reactive Streams Specification. reactive-streams.org — Especificação JVM-cross-language para pipelines de stream com backpressure. Base de Akka Streams, Reactor, RxJava 2+.
  12. vídeo Go Concurrency Patterns — Rob Pike (Google I/O 2012). YouTube. A palestra fundadora dos padrões CSP em Go. Pike apresenta worker pool, fan-in, multiplexing como composição natural.