MÓDULO 09 · CONCEITO 09 DE 14

Apache Kafka

log distribuído · partições · consumer groups · retenção · Kafka Streams · quando Kafka vence filas

Tempo de leitura ~24 min Pré-requisito Conceito 08 — Message Queues Próximo 10 · Padrões de Mensageria Assíncrona

Kafka não é uma message queue — é um log distribuído e persistente. A distinção importa: em uma fila, mensagens são deletadas ao serem consumidas. No Kafka, mensagens ficam armazenadas por um período configurável (horas, dias, semanas) e qualquer consumidor pode ler qualquer offset do log a qualquer momento. Isso muda fundamentalmente o que é possível: múltiplos sistemas independentes consumindo o mesmo stream de eventos sem coordenação, replay de eventos históricos, e reprocessamento de dados de qualquer ponto no tempo.

Arquitetura fundamental

O log como abstração central

Um tópico Kafka é um log append-only particionado. Cada mensagem escrita recebe um offset — um número sequencial imutável dentro da partição. Consumidores leem o log a partir de um offset e avançam o cursor (commit do offset) explicitamente. O broker não rastreia o que foi "consumido" — apenas armazena o log. Isso inverte a responsabilidade: o consumidor controla sua posição no log, não o broker.

# Estrutura de um tópico com 3 partições
Topic: orders

Partition 0: [offset 0][offset 1][offset 2][offset 3]...  → replica em broker 1, 2
Partition 1: [offset 0][offset 1][offset 2]...            → replica em broker 2, 3
Partition 2: [offset 0][offset 1][offset 2][offset 3][offset 4]...  → replica em broker 1, 3

# Cada partição tem:
# - Um leader (recebe writes e reads)
# - N-1 followers (réplicas síncronas ou assíncronas)
# - ISR (In-Sync Replicas): conjunto de réplicas consideradas "atualizadas"

Partições e paralelismo

Partições são a unidade de paralelismo do Kafka. O número de partições de um tópico define o paralelismo máximo de consumo — você não pode ter mais consumers ativos em um grupo do que partições. Se um tópico tem 12 partições, o consumer group pode ter no máximo 12 consumers ativos; consumers adicionais ficam ociosos como standby.

nota Quantas partições? Regra prática: estime o throughput alvo dividido pelo throughput por partição (~10-50MB/s dependendo do hardware). Para tópicos de eventos de negócio sem requisito de throughput extremo, 6-12 partições é um bom ponto de partida — permitem escalar até 12 consumers e têm overhead de metadados gerenciável. Aumentar partições depois é possível mas quebra a garantia de ordenação por key.

Partition key e ordenação

Kafka garante ordenação apenas dentro de uma partição. Para garantir que eventos de um mesmo entidade (pedido, cliente) sejam processados em ordem, use a partition key — o broker mapeia a key para a mesma partição via hash consistente.

# Ordenação por customer_id: todos os eventos do mesmo cliente → mesma partição
producer.send("orders",
    key=customer_id.encode(),    # key determina a partição
    value=event_json.encode()
)

# Sem key → round-robin entre partições (sem garantia de ordem)
producer.send("metrics", value=metric_json.encode())

# Cuidado: partition key com distribuição ruim → hot partition
# Se 80% dos pedidos são de 10 customers → 80% do tráfego em 10 partições
# Solução: compor a key (customer_id + hour) ou usar consistent hashing externo

Consumer Groups

Consumer groups são o mecanismo de escalonamento do lado do consumidor. Cada grupo recebe todas as mensagens do tópico, mas cada partição é atribuída a exatamente um consumer dentro do grupo. Grupos diferentes consomem o tópico independentemente, do seu próprio offset — sem interferência entre si.

# Topic: orders (3 partições)
# Consumer Group A (serviço de inventário): 3 consumers
Consumer A-1 → lê Partition 0
Consumer A-2 → lê Partition 1
Consumer A-3 → lê Partition 2

# Consumer Group B (serviço de faturamento): 1 consumer
Consumer B-1 → lê Partition 0, 1, 2 (todas — consumer único no grupo)

# Consumer Group C (analytics): 5 consumers
Consumer C-1 → lê Partition 0
Consumer C-2 → lê Partition 1
Consumer C-3 → lê Partition 2
Consumer C-4, C-5 → ociosos (mais consumers que partições)

Rebalancing

Quando um consumer entra ou sai do grupo, o Kafka realoca partições entre os consumers ativos — chamado de rebalancing. Durante o rebalance, o processamento para temporariamente (stop-the-world). Em Kafka moderno (v2.4+), o cooperative rebalancing (incremental) minimiza o impacto: apenas as partições que precisam ser movidas são pausadas, não todas.

# Configurações críticas para rebalancing
group.id=inventory-service        # identificador do consumer group
session.timeout.ms=45000          # tempo até considerar consumer morto
heartbeat.interval.ms=3000        # frequência de heartbeat (deve ser < session.timeout/3)
max.poll.interval.ms=300000       # tempo máximo entre polls (aumentar para processamento lento)
partition.assignment.strategy=CooperativeStickyAssignor  # minimiza rebalancing

Retenção e compactação

Retenção por tempo e tamanho

Mensagens no Kafka expiram após um período configurável — não quando são consumidas. Isso é fundamental: você pode ter um consumer que processa tudo em 1 segundo e outro que processa em 1 hora, ambos no mesmo tópico, sem que um afete o outro.

# Configuração de retenção por tópico
kafka-configs --bootstrap-server kafka:9092 \
  --entity-type topics --entity-name orders \
  --alter \
  --add-config retention.ms=604800000  # 7 dias
              retention.bytes=10737418240  # 10GB por partição (o que vier primeiro)

Log compaction

Para tópicos onde o que importa é o estado atual de cada key (não o histórico completo), log compaction mantém apenas a mensagem mais recente por partition key. O Kafka remove mensagens antigas com a mesma key, mas garante que a última versão nunca é deletada. Perfeito para changelogs de banco de dados e tabelas de lookup.

# Tópico compactado — changelog de clientes
# Kafka mantém apenas o registro mais recente por customer_id
[customer-123: {name: "Ana", plan: "basic"}]     ← antigo, será removido
[customer-456: {name: "João", plan: "pro"}]
[customer-123: {name: "Ana Silva", plan: "pro"}] ← este fica (mais recente)

# Configurar compaction
kafka-configs --bootstrap-server kafka:9092 \
  --entity-type topics --entity-name customers-state \
  --alter --add-config cleanup.policy=compact

# Tombstone: publicar mensagem com value=null → deleta a key do log compactado
producer.send("customers-state", key="customer-123", value=None)

Garantias de entrega no produtor

# acks=0: fire-and-forget — sem confirmação
# acks=1: leader confirma → risco de perda se leader cair antes de replicar
# acks=all (ou -1): todos os ISRs confirmam → máxima durabilidade

# Configuração de produtor para durabilidade máxima
producer = KafkaProducer(
    bootstrap_servers=['kafka:9092'],
    acks='all',                # aguardar todos os ISRs
    retries=2147483647,        # retry infinito até timeout
    max_in_flight_requests_per_connection=1,  # manter ordem com retries
    enable_idempotence=True,   # exactly-once no produtor (dedup por sequence number)
    delivery_timeout_ms=120000,
    linger_ms=5,               # aguardar 5ms para batching
    batch_size=16384,          # tamanho máximo do batch
    compression_type='snappy', # compressão → menos I/O
)
atenção enable_idempotence e ordenação: com enable_idempotence=True, o Kafka garante que o produtor não duplica mensagens mesmo com retries — usando sequence numbers por partição. Para manter ordenação com retries, defina max_in_flight_requests_per_connection=1 (ou até 5 com idempotência ativada, que Kafka gerencia a reordenação). Sem idempotência + retries = possível duplicação.

Kafka Streams e ksqlDB

Kafka Streams é uma biblioteca Java/Scala para processamento de streams diretamente dentro de uma aplicação — sem cluster separado como Spark ou Flink. O processamento acontece nos mesmos processos que consomem os tópicos, com estado local em RocksDB e fault-tolerance via changelog topics.

# Kafka Streams — exemplo em pseudocódigo (Java API)
StreamsBuilder builder = new StreamsBuilder();

KStream<String, Order> orders = builder.stream("orders");

// Filtrar apenas pedidos aprovados
KStream<String, Order> approved = orders
    .filter((key, value) -> value.getStatus() == OrderStatus.APPROVED);

// Contar pedidos por cliente em janela de 1 hora (windowed aggregation)
KTable<Windowed<String>, Long> ordersByCustomer = approved
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)))
    .count();

// Publicar resultado em outro tópico
ordersByCustomer
    .toStream()
    .map((key, count) -> KeyValue.pair(key.key(), count.toString()))
    .to("orders-per-customer-hourly");

// Estado local consultável via interactive queries (sem round-trip ao Kafka)
ReadOnlyWindowStore<String, Long> store = streams.store(
    StoreQueryParameters.fromNameAndType("orders-count-store",
        QueryableStoreTypes.windowStore()));
long count = store.fetch("customer-123", startTime, endTime);

ksqlDB — SQL sobre streams

# ksqlDB — processar streams com SQL
# Criar stream a partir de tópico Kafka
CREATE STREAM orders_stream (
  order_id VARCHAR,
  customer_id VARCHAR,
  total DOUBLE,
  status VARCHAR,
  created_at TIMESTAMP
) WITH (
  KAFKA_TOPIC='orders',
  VALUE_FORMAT='JSON',
  TIMESTAMP='created_at'
);

# Agregação contínua — atualiza conforme novos eventos chegam
CREATE TABLE orders_summary AS
  SELECT
    customer_id,
    COUNT(*) AS order_count,
    SUM(total) AS total_spent
  FROM orders_stream
  WHERE status = 'APPROVED'
  GROUP BY customer_id
  EMIT CHANGES;

# Push query — retorna resultados continuamente (como subscribe)
SELECT * FROM orders_summary WHERE customer_id = 'cust-123' EMIT CHANGES;

Kafka vs RabbitMQ — quando usar cada um

AspectoKafkaRabbitMQ / SQS
ModeloLog distribuído — consumers controlam offsetFila — broker rastreia o que foi consumido
RetençãoDias/semanas — replay possível a qualquer momentoAté o ACK — mensagem deletada após consumo
Múltiplos consumersN consumer groups independentes no mesmo tópicoCada consumer recebe mensagens diferentes (competing consumers)
ThroughputMilhões de eventos/s com batching e compressãoDezenas de milhares de msgs/s
LatênciaTipicamente 5-15ms (com linger_ms) — não ideal para RPCSub-milissegundo possível com acks=0
OrdenaçãoGarantida dentro da partiçãoFIFO com SQS FIFO; melhor esforço em RabbitMQ
Ops complexityAlta — ZooKeeper/KRaft, rebalancing, retention tuningModerada — mais simples de operar
Ideal paraEvent sourcing, audit log, pipelines de dados, CDCTask queues, RPC assíncrono, notificações
dica Regra prática: use Kafka quando precisa que múltiplos consumidores independentes processem o mesmo evento, quando precisar de replay histórico, ou quando o throughput é >100k eventos/s. Use RabbitMQ/SQS para task queues simples onde cada mensagem deve ser processada exatamente por um worker e o replay não é necessário. A maioria dos sistemas de microsserviços pequenos não precisa de Kafka.

Comparação por linguagem

Produção e consumo de eventos no Kafka com tratamento correto de offsets, erros e rebalancing em cada linguagem.

C# — Confluent.Kafka
// Producer com idempotência e confirmação
using Confluent.Kafka;
using System.Text.Json;

var producerConfig = new ProducerConfig
{
    BootstrapServers = "kafka:9092",
    Acks = Acks.All,
    EnableIdempotence = true,
    MessageTimeoutMs = 30000,
    LingerMs = 5,
    BatchSize = 16384,
    CompressionType = CompressionType.Snappy,
};

using var producer = new ProducerBuilder<string, string>(producerConfig).Build();

var orderEvent = new OrderCreated(orderId, customerId, total);
var message = new Message<string, string>
{
    Key = customerId,               // partition key — mesmo cliente → mesma partição
    Value = JsonSerializer.Serialize(orderEvent),
    Headers = new Headers
    {
        { "event_type", System.Text.Encoding.UTF8.GetBytes("order.created") },
        { "schema_version", System.Text.Encoding.UTF8.GetBytes("1") },
    },
};

// Publicar com confirmação assíncrona
var result = await producer.ProduceAsync("orders", message, cancellationToken);
logger.LogInformation(
    "Publicado: topic={Topic} partition={Partition} offset={Offset}",
    result.Topic, result.Partition.Value, result.Offset.Value);

// ---------------------------------------------------------------
// Consumer com commit manual e graceful shutdown
var consumerConfig = new ConsumerConfig
{
    BootstrapServers = "kafka:9092",
    GroupId = "inventory-service",
    AutoOffsetReset = AutoOffsetReset.Earliest,
    EnableAutoCommit = false,       // commit manual — após processar
    EnableAutoOffsetStore = false,  // controle explícito do offset store
    MaxPollIntervalMs = 300000,
    SessionTimeoutMs = 45000,
    HeartbeatIntervalMs = 3000,
    PartitionAssignmentStrategy = PartitionAssignmentStrategy.CooperativeSticky,
};

using var consumer = new ConsumerBuilder<string, string>(consumerConfig)
    .SetPartitionsAssignedHandler((c, partitions) =>
    {
        logger.LogInformation("Partições atribuídas: {Partitions}",
            string.Join(", ", partitions));
    })
    .SetPartitionsRevokedHandler((c, partitions) =>
    {
        logger.LogInformation("Partições revogadas — fazendo commit antes do rebalance");
        c.Commit(partitions);
    })
    .Build();

consumer.Subscribe("orders");

try
{
    while (!cancellationToken.IsCancellationRequested)
    {
        var consumeResult = consumer.Consume(cancellationToken);
        if (consumeResult is null) continue;

        try
        {
            var order = JsonSerializer.Deserialize<OrderCreated>(consumeResult.Message.Value)!;
            await inventoryService.ReserveStockAsync(order, cancellationToken);

            // Commit apenas após processar com sucesso
            consumer.StoreOffset(consumeResult);  // store para commit em batch
        }
        catch (Exception ex) when (ex is not OperationCanceledException)
        {
            logger.LogError(ex, "Erro ao processar offset {Offset}", consumeResult.Offset);
            // Estratégia de falha: logar + continuar (Kafka não tem DLQ nativo)
            // Publicar em tópico de dead-letter manualmente
            await PublishToDeadLetter(consumeResult, ex);
            consumer.StoreOffset(consumeResult); // não travar — avançar offset
        }

        // Commit em batch a cada 5s (via AutoCommit interno com EnableAutoOffsetStore)
    }
}
finally
{
    consumer.Commit();
    consumer.Close();
}

C# usa EnableAutoOffsetStore=false + StoreOffset manual para controlar exatamente quando o offset é marcado como processado, sem perder mensagens em caso de crash entre o consumo e o processamento.

Python — confluent-kafka-python
from confluent_kafka import Consumer, Producer, KafkaError, KafkaException
import json
import logging
import signal
import sys

logger = logging.getLogger(__name__)

# Producer
producer = Producer({
    'bootstrap.servers': 'kafka:9092',
    'acks': 'all',
    'enable.idempotence': True,
    'linger.ms': 5,
    'batch.size': 16384,
    'compression.type': 'snappy',
    'message.timeout.ms': 30000,
})

def delivery_report(err, msg):
    if err:
        logger.error("Falha ao publicar: %s", err)
    else:
        logger.debug("Publicado: %s [%d] @%d", msg.topic(), msg.partition(), msg.offset())

def publish_order_created(order: dict) -> None:
    producer.produce(
        topic='orders',
        key=order['customer_id'],      # partition key
        value=json.dumps(order).encode(),
        headers={'event_type': 'order.created', 'version': '1'},
        callback=delivery_report,
    )
    producer.poll(0)  # disparar callbacks pendentes sem bloquear

# Flush ao encerrar
def shutdown():
    logger.info("Flushing produtor...")
    producer.flush(timeout=10)

# Consumer com commit manual
consumer = Consumer({
    'bootstrap.servers': 'kafka:9092',
    'group.id': 'inventory-service',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False,      # commit manual
    'max.poll.interval.ms': 300000,
    'session.timeout.ms': 45000,
    'heartbeat.interval.ms': 3000,
    'partition.assignment.strategy': 'cooperative-sticky',
})

running = True

def handle_signal(signum, frame):
    global running
    running = False

signal.signal(signal.SIGINT, handle_signal)
signal.signal(signal.SIGTERM, handle_signal)

def on_assign(consumer, partitions):
    logger.info("Partições atribuídas: %s", [p.partition for p in partitions])

def on_revoke(consumer, partitions):
    logger.info("Commit antes do rebalance")
    consumer.commit(offsets=partitions)

consumer.subscribe(['orders'], on_assign=on_assign, on_revoke=on_revoke)

dead_letter_producer = Producer({'bootstrap.servers': 'kafka:9092'})

try:
    while running:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue

        if msg.error():
            if msg.error().code() == KafkaError.PARTITION_EOF:
                logger.debug("Fim da partição %d", msg.partition())
            else:
                raise KafkaException(msg.error())
            continue

        try:
            event = json.loads(msg.value())
            process_inventory(event)

            # Commit após processar
            consumer.commit(message=msg, asynchronous=False)

        except ProcessingError as e:
            logger.error("Erro de negócio: %s — enviando para DLT", e)
            # Dead letter topic manual (Kafka não tem DLQ nativo)
            dead_letter_producer.produce(
                'orders.dead-letter',
                key=msg.key(),
                value=msg.value(),
                headers={
                    'original_topic': 'orders',
                    'error': str(e),
                    'original_offset': str(msg.offset()),
                },
            )
            consumer.commit(message=msg, asynchronous=False)

finally:
    consumer.close()
    dead_letter_producer.flush()

Python usa enable.auto.commit=False com consumer.commit(asynchronous=False) após cada mensagem processada. Para alto throughput, commitar a cada N mensagens ou a cada intervalo de tempo reduz overhead de commits.

Go — franz-go (high-performance Kafka client)
package kafka

import (
    "context"
    "encoding/json"
    "fmt"
    "log/slog"

    "github.com/twmb/franz-go/pkg/kgo"
)

// Producer com idempotência
func NewProducer(brokers []string) (*kgo.Client, error) {
    return kgo.NewClient(
        kgo.SeedBrokers(brokers...),
        kgo.RequiredAcks(kgo.AllISRAcks()),   // acks=all
        kgo.RecordDeliveryTimeout(30e9),       // 30s timeout
        kgo.ProducerBatchCompression(kgo.SnappyCompression()),
        kgo.ProducerLinger(5e6),               // 5ms linger para batching
        kgo.ProducerBatchMaxBytes(16384),
        kgo.WithLogger(kgo.BasicLogger(nil, kgo.LogLevelInfo, nil)),
    )
}

type OrderEvent struct {
    OrderID    string  `json:"order_id"`
    CustomerID string  `json:"customer_id"`
    Total      float64 `json:"total"`
}

func PublishOrderCreated(ctx context.Context, client *kgo.Client, order OrderEvent) error {
    body, _ := json.Marshal(order)

    record := &kgo.Record{
        Topic: "orders",
        Key:   []byte(order.CustomerID), // partition key
        Value: body,
        Headers: []kgo.RecordHeader{
            {Key: "event_type", Value: []byte("order.created")},
            {Key: "version", Value: []byte("1")},
        },
    }

    // ProduceSync aguarda confirmação do broker
    results := client.ProduceSync(ctx, record)
    for _, res := range results {
        if res.Err != nil {
            return fmt.Errorf("falha ao publicar: %w", res.Err)
        }
        slog.Info("publicado",
            "topic", res.Record.Topic,
            "partition", res.Record.Partition,
            "offset", res.Record.Offset)
    }
    return nil
}

// Consumer com commit manual e graceful shutdown
func NewConsumer(brokers []string, group string) (*kgo.Client, error) {
    return kgo.NewClient(
        kgo.SeedBrokers(brokers...),
        kgo.ConsumerGroup(group),
        kgo.ConsumeTopics("orders"),
        kgo.DisableAutoCommit(),                   // commit manual
        kgo.BlockRebalanceOnPoll(),                // não rebalancear durante poll
        kgo.OnPartitionsAssigned(func(ctx context.Context, c *kgo.Client, assigned map[string][]int32) {
            slog.Info("partições atribuídas", "count", len(assigned))
        }),
        kgo.OnPartitionsRevoked(func(ctx context.Context, c *kgo.Client, revoked map[string][]int32) {
            // Commit antes do rebalance
            if err := c.CommitUncommittedOffsets(ctx); err != nil {
                slog.Error("falha ao commitar antes do rebalance", "err", err)
            }
        }),
    )
}

func ConsumeOrders(ctx context.Context, client *kgo.Client, handler func(context.Context, OrderEvent) error) error {
    deadLetterProducer, _ := NewProducer([]string{"kafka:9092"})
    defer deadLetterProducer.Close()

    for {
        fetches := client.PollFetches(ctx)
        if errs := fetches.Errors(); len(errs) > 0 {
            for _, err := range errs {
                if err.Err == context.Canceled {
                    return nil
                }
                slog.Error("erro no fetch", "err", err.Err)
            }
            continue
        }

        fetches.EachRecord(func(record *kgo.Record) {
            var event OrderEvent
            if err := json.Unmarshal(record.Value, &event); err != nil {
                slog.Error("mensagem inválida", "err", err)
                publishDeadLetter(ctx, deadLetterProducer, record, err)
                return
            }

            if err := handler(ctx, event); err != nil {
                slog.Error("erro ao processar", "order_id", event.OrderID, "err", err)
                publishDeadLetter(ctx, deadLetterProducer, record, err)
                // Avançar offset mesmo em erro para não travar a partição
            }
        })

        // Commit em batch após processar todos os records do fetch
        if err := client.CommitUncommittedOffsets(ctx); err != nil {
            slog.Error("erro no commit", "err", err)
        }
    }
}

func publishDeadLetter(ctx context.Context, p *kgo.Client, original *kgo.Record, cause error) {
    p.ProduceSync(ctx, &kgo.Record{
        Topic: original.Topic + ".dead-letter",
        Key:   original.Key,
        Value: original.Value,
        Headers: append(original.Headers,
            kgo.RecordHeader{Key: "error", Value: []byte(cause.Error())},
            kgo.RecordHeader{Key: "original_offset", Value: []byte(fmt.Sprint(original.Offset))},
        ),
    })
}

franz-go é o cliente Kafka mais performático para Go — implementação pura Go sem cgo, com suporte completo a KIP modernas. O commit em batch após cada PollFetches é mais eficiente que commitar mensagem a mensagem.

Decisões de engenharia

Kafka vs RabbitMQ/SQS
Kafka é a escolha quando: múltiplos consumer groups independentes precisam processar o mesmo stream de eventos (inventário, faturamento e analytics consomem o mesmo tópico "orders" do seu próprio offset); quando precisa de replay histórico (reprocessar os últimos 7 dias de eventos após um bug); quando o throughput é >100k msg/s com batching; ou quando está construindo um event-sourcing system ou CDC pipeline. RabbitMQ/SQS é melhor quando: a mensagem deve ser processada por exatamente um worker (task queue), quando precisa de sub-milissegundo de latência, quando o roteamento por conteúdo (exchange topic com wildcards) é necessário, ou quando a equipe não tem capacidade de operar um cluster Kafka. A armadilha mais comum: usar Kafka para task queues simples — o overhead de cluster, rebalancing e ausência de DLQ nativa não compensa para casos que uma fila resolveria com menos complexidade.
Retenção por tempo vs Log Compaction
Retenção por tempo (cleanup.policy=delete) mantém todas as mensagens por N dias e então as deleta — adequado para streams de eventos onde o histórico completo importa por um período (audit log, replay de eventos de negócio, debugging). Log Compaction (cleanup.policy=compact) mantém apenas a mensagem mais recente por partition key — adequado para changelogs de estado onde o que importa é o estado atual, não o histórico (tabela de clientes, preços de produtos, configurações). Você pode combinar os dois (compact,delete): compactar mantendo apenas o mais recente E deletar versões antigas após N dias. A escolha depende de "precisamos do histórico ou apenas do estado atual?" — eventos de negócio geralmente querem histórico, tabelas de lookup querem apenas o estado atual.
acks=all vs acks=1 — durabilidade vs throughput
acks=all (com min.insync.replicas=2) garante que a mensagem está em pelo menos 2 réplicas antes de confirmar ao produtor — zero chance de perda mesmo com falha de um broker. O custo é latência adicional de ~5-10ms para replicação síncrona. acks=1 confirma quando apenas o leader recebeu — mais rápido, mas se o leader cair antes de replicar, a mensagem se perde. acks=0 é fire-and-forget sem confirmação — máximo throughput mas sem garantia. A escolha correta para a maioria dos sistemas de negócio é acks=all com min.insync.replicas=2 e enable.idempotence=true. Reserve acks=1 para métricas e logs onde alguma perda é aceitável. Nunca use acks=0 para dados de negócio.
Kafka Streams vs Flink / Spark Streaming
Kafka Streams é uma biblioteca Java — não um cluster separado. O estado fica em RocksDB local com changelog topics para fault-tolerance. É a escolha quando: o processamento é em Java/Scala, a complexidade do stream é moderada (joins, aggregations, windowing simples), e você não quer operar um cluster Flink/Spark. Flink e Spark Structured Streaming são clusters separados com capacidades muito mais avançadas: joins complexos entre streams de fontes diferentes, ML pipelines, processamento com exactly-once entre múltiplas fontes e sinks, e escalonamento independente dos consumers. A regra: se o processamento pode ser expresso como operações sobre tópicos Kafka (filter, map, groupBy, window, join entre tópicos), Kafka Streams é suficiente e muito mais simples. Se o processamento envolve fontes mistas (Kafka + banco de dados + APIs) ou transformações estatísticas complexas, use Flink.

Exercícios práticos

  1. Configure um cluster Kafka local com 3 brokers (Docker Compose) e crie um tópico com 6 partições e fator de replicação 3. Use kafka-producer-perf-test para medir throughput. Critério: throughput com linger.ms=0 documentado vs linger.ms=20 — diferença de ≥2× esperada; ao matar 1 broker durante a produção (com acks=all), nenhuma mensagem é perdida (verificar via kafka-consumer-groups --describe e contagem de offsets); cluster se recupera quando o broker é reiniciado.
  2. Demonstre consumer groups: crie um tópico com 4 partições, inicie 2 consumers no mesmo grupo e publique 100 mensagens. Adicione um 3º consumer e observe o rebalancing. Critério: com 2 consumers, kafka-consumer-groups --describe mostra 2 partições por consumer; ao adicionar o 3º, distribuição muda para 2/1/1 partições; mensagens publicadas durante o rebalancing não são perdidas (total consumido = 100); consumer ocioso quando grupo tem 5 consumers para 4 partições.
  3. Implemente dead letter topic manual: falha após 3 tentativas move a mensagem para orders.dead-letter com headers de diagnóstico. Critério: mensagem que sempre lança exceção aparece em orders.dead-letter com headers original_topic, original_offset e error; consumer da DLT loga severity WARN com os dados; o offset da fila principal avança mesmo após a falha (não trava a partição); script de replay republica da DLT para a fila original e processa com sucesso após corrigir o handler.
  4. Configure log compaction em customer-profiles. Publique 10 atualizações do mesmo customer_id e force compaction com min.cleanable.dirty.ratio=0.01. Critério: kafka-console-consumer --from-beginning após compaction mostra apenas 1 registro por customer_id (10 customers → 10 registros, não 100); publicar tombstone (value=null) seguido de compaction remove o customer_id completamente do output; verificar via kafka-log-dirs que o tamanho do log reduziu.
  5. Implemente Kafka Streams para contar pedidos por cliente em janelas de 5 minutos, publicando resultados em orders-count. Critério: consumer do tópico orders-count mostra contagem atualizada a cada nova mensagem publicada; ao publicar 3 pedidos do cliente A e 2 do cliente B em menos de 5 minutos, o output mostra A:3 e B:2; após 5 minutos, nova janela começa do zero; eventos chegando fora de ordem (com timestamp antigo) são incluídos na janela correta.

Perguntas de entrevista

Qual a diferença fundamental entre Kafka e uma message queue tradicional como RabbitMQ?

A diferença central é o modelo de responsabilidade: em uma fila, o broker rastreia o que foi consumido e deleta a mensagem após o ACK. O consumidor não tem controle sobre sua posição. Em Kafka, o broker é apenas um log append-only imutável — ele não sabe o que foi "consumido". O consumidor mantém e commita seu próprio offset, controlando exatamente de onde quer ler.

As consequências práticas dessa diferença são enormes: (1) Múltiplos consumer groups independentes — cada um mantém seu próprio offset e processa o log inteiro do seu próprio ritmo, sem interferência. Em RabbitMQ, a mensagem vai para um consumer e desaparece. (2) Replay histórico — um novo serviço pode consumir eventos dos últimos 7 dias sem necessidade de reprocessamento manual. Em filas, mensagens deletadas são perdidas para sempre. (3) Auditabilidade — o log é a fonte da verdade, imutável. Você pode reprocessar qualquer período histórico.

O trade-off é complexidade operacional: Kafka requer gestão de partições, replication factor, ISR, e rebalancing de consumer groups. Para casos de uso simples (task queue, um worker processa e esquece), essa complexidade não se justifica. Kafka se justifica quando múltiplos sistemas independentes precisam do mesmo stream de eventos, quando replay é necessário, ou quando o throughput é muito alto.

Como o particionamento afeta ordenação e paralelismo, e como escolher uma boa partition key?

Kafka garante ordenação apenas dentro de uma partição. O número de partições define o paralelismo máximo de consumo — um consumer group com mais consumers do que partições tem consumers ociosos. Essa é a tensão central: mais partições = mais paralelismo, mas a ordenação global é sacrificada.

A partition key mapeia mensagens para partições via hash consistente — a mesma key sempre vai para a mesma partição, garantindo que eventos relacionados sejam processados em ordem por um único consumer. Para eventos de pedidos, customer_id como key garante que todos os eventos de um cliente sejam ordenados. Para eventos de pagamentos, account_id.

Armadilhas comuns: (1) Hot partition — se a distribuição das keys é desigual (10% dos clientes geram 90% do volume), as partições desses clientes recebem muito mais carga. Solução: compor a key (customer_id:timestamp_bucket) ou usar hashing externo mais granular. (2) Aumentar partições depois quebra o contrato de ordenação — mensagens que eram na mesma partição podem acabar em partições diferentes após o re-hash. Defina o número de partições com margem desde o início. (3) Sem key = round-robin — quando ordem não importa (métricas, logs), não use key e o Kafka distribuirá uniformemente.

O que acontece durante um rebalancing de consumer group e como minimizar o impacto?

Rebalancing é o processo de redistribuir partições entre os consumers de um grupo — disparado quando um consumer entra, sai, ou perde a conexão com o broker. O rebalancing clássico (eager protocol) é stop-the-world: todos os consumers param de processar, devolvem todas as partições, e aguardam a nova atribuição. Em grupos grandes, isso pode levar segundos a dezenas de segundos de pausa no processamento.

O rebalancing incremental cooperativo (disponível desde Kafka 2.4, configurável com partition.assignment.strategy=CooperativeStickyAssignor) melhora significativamente: apenas as partições que precisam ser movidas são pausadas — consumers que mantêm as mesmas partições continuam processando. Isso transforma um stop-the-world em uma redistribuição gradual.

Para minimizar rebalancing falso (disparado por consumer lento, não falho): aumentar max.poll.interval.ms para workloads lentas (processamento de 2 minutos por batch requer >120s), manter session.timeout.ms e heartbeat.interval.ms calibrados (heartbeat deve ser <1/3 do session timeout), e garantir que o processamento por poll não exceda max.poll.interval.ms (processar em batch menor se necessário). Commits de offset lentos também podem causar rebalancing — use commit assíncrono para não bloquear o poll loop.

Explique acks=all, ISR e min.insync.replicas — como esses parâmetros interagem para garantir durabilidade?

ISR (In-Sync Replicas): conjunto de réplicas que estão atualizadas com o leader — receberam todas as mensagens recentemente. Uma réplica sai do ISR se ficou muito atrasada (parametrizado por replica.lag.time.max.ms). O Kafka só considera eleição de novo leader entre réplicas do ISR.

acks=all: o produtor aguarda confirmação de que TODOS os membros do ISR receberam a mensagem. Se o ISR tem 3 réplicas, todas devem confirmar antes do ack ser enviado ao produtor. Se o ISR cair para 1 réplica (o leader), acks=all é satisfeito com apenas 1 réplica — o que pode ser perigoso.

min.insync.replicas (MINSR): número mínimo de réplicas que devem estar no ISR para aceitar writes. Com replication.factor=3 e min.insync.replicas=2, se apenas 1 réplica está no ISR (leader sozinho), o broker rejeita escritas com NotEnoughReplicasException — melhor rejeitar do que arriscar perda de dados. A combinação correta para durabilidade máxima: replication.factor=3 + min.insync.replicas=2 + acks=all no produtor. Isso tolera falha de 1 broker sem perda de dados, mas rejeita escritas se 2 brokers falham simultaneamente — o que é o comportamento correto para dados críticos.

O que é log compaction e quando usar em vez de retenção por tempo?

Log compaction é uma política de limpeza onde o Kafka mantém apenas a mensagem mais recente por partition key, removendo versões antigas da mesma key. A semântica é "o estado atual de cada entidade", não "o histórico completo de eventos". O processo de compaction roda em background — as mensagens antigas não são removidas imediatamente, mas em ciclos de limpeza configuráveis.

Use retenção por tempo (cleanup.policy=delete) quando: você precisa do histórico completo para auditoria, replay de eventos, ou análise temporal. Exemplos: stream de pedidos, log de ações do usuário, eventos de sistema. Mensagens expiram após N dias/semanas independente de terem a mesma key.

Use log compaction (cleanup.policy=compact) quando: o que importa é o estado atual de cada entidade, não o histórico. Exemplos: changelog de banco de dados (cada update substitui o anterior), tabela de preços (preço atual do produto), configurações de serviços. O tópico compactado funciona como uma "tabela distribuída" — um novo consumer pode construir o estado atual lendo do início e apenas o estado mais recente estará lá.

A combinação compact,delete é poderosa: mantém apenas o estado atual (compactação) E remove entidades após um período (retenção). Útil para changelogs de clientes onde você quer o estado atual mas não precisa de clientes deletados há mais de 30 dias. Tombstones (mensagens com value=null) marcam a deleção de uma key do log compactado.

Referências

  1. artigo The Log: What every software engineer should know about real-time data's unifying abstraction — Jay Kreps (2013). engineering.linkedin.com/distributed-systems/log — O artigo fundacional sobre logs distribuídos. Explica por que o log é a abstração unificadora de sistemas distribuídos e é a base conceitual do Kafka.
  2. livro Kafka: The Definitive Guide — Neha Narkhede, Gwen Shapira & Todd Palino (O'Reilly, 2021, 2ª ed.). O livro de referência do Kafka, escrito pelos criadores. Cobre arquitetura, producers, consumers, Kafka Streams, operação e tuning de performance.
  3. docs Apache Kafka Documentation — Apache Software Foundation. kafka.apache.org/documentation — Referência completa incluindo configurações de producer, consumer, broker e Kafka Streams. A seção de Design é leitura obrigatória para entender as garantias do sistema.
  4. artigo Kafka Improvement Proposals (KIPs) — Apache Kafka Community. cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals — Fonte primária para entender como funcionalidades foram projetadas. KIP-98 (exactly-once), KIP-429 (incremental rebalance) e KIP-500 (KRaft, sem ZooKeeper) são particularmente importantes.
  5. docs Confluent Developer Documentation — Confluent. developer.confluent.io — Tutoriais práticos, exemplos de código em múltiplas linguagens e cursos gratuitos sobre Kafka. O curso "Apache Kafka Fundamentals" é excelente para consolidar a teoria.
  6. paper Kafka: a Distributed Messaging System for Log Processing — Jay Kreps, Neha Narkhede & Jun Rao (NetDB Workshop, 2011). O paper original que introduziu o Kafka ao mundo acadêmico. Descreve o modelo de log particionado, garantias de ordenação por partição, e benchmarks comparando com ActiveMQ e RabbitMQ da época.
  7. docs Kafka Streams Developer Guide — Apache Software Foundation. kafka.apache.org/documentation/streams — Documentação oficial do Kafka Streams: topologias, KTable vs KStream, windowing, joins e stateful processing com RocksDB local. Essencial para quem usa Kafka além do simples produce/consume.
  8. docs Confluent Schema Registry Documentation — Confluent. docs.confluent.io/platform/current/schema-registry — Gerenciamento de schemas Avro, JSON Schema e Protobuf com evolução compatível. Explica as regras de compatibilidade (BACKWARD, FORWARD, FULL) e integração com serializadores Kafka.
  9. artigo Turning the Database Inside-Out with Apache Samza — Martin Kleppmann (Strange Loop, 2015). martin.kleppmann.com — Palestra/artigo que articula como logs imutáveis (como o do Kafka) invertem o modelo tradicional de banco de dados: o log é a fonte de verdade e as views (tabelas) são derivadas. Base conceitual do event sourcing moderno.
  10. livro Designing Event-Driven Systems — Ben Stopford (O'Reilly, 2018, acesso gratuito). confluent.io/designing-event-driven-systems — Cobre Event Sourcing, CQRS e stream processing com Kafka como plataforma. Explica quando usar tópicos Kafka como API pública versus interna e padrões de integração de microsserviços.
  11. blog Exactly-Once Semantics Are Possible: Here's How Kafka Does It — Confluent Engineering (2017). confluent.io/blog/exactly-once-semantics-are-possible — Explica a implementação de exactly-once via idempotent producer (PID + sequence numbers) e transações Kafka. Detalha o Two-Phase Commit interno e as limitações do EOS cross-cluster.
  12. blog Apache Kafka Without ZooKeeper: A Sneak Peek At the Simplest Kafka Yet — Confluent Engineering (2021). confluent.io/blog/kafka-without-zookeeper-a-sneak-peek — Descreve o KRaft (KIP-500): Kafka gerenciando seus próprios metadados com Raft em vez de depender do ZooKeeper. Explica o novo modelo de controller e implicações operacionais para Kafka 3.x+.