Паттерн Transactional Outbox с Kafka: как не потерять события при синхронизации баз данных

Описание паттерна Transactional Outbox

Обложка: Паттерн Transactional Outbox с Kafka: как не потерять события при синхронизации баз данных

Боль: данные разошлись, и вы не знаете когда

В нашей системе две базы данных:

- Остатки по счетам - текущий остаток по каждому счёту

- Остатки по клиентам - агрегированный баланс клиента по всем его счетам.

При каждом изменении остатка по счёту нужно обновить Остатки по счетам и уведомить Остатки по клиентам через Kafka. Логика простая. Но в какой-то момент клиент звонит в поддержку: его баланс в приложении не совпадает с реальным. Вы смотрите в обе базы — данные разошлись. Когда именно это произошло — непонятно.

Причина почти всегда одна и та же:

			1. UPDATE accounts SET amount = 500 WHERE account_id = 42; ✅
2. kafka.send("balance.changed", event); ❌ сбой сети

		

Транзакция закоммичена, событие не ушло. Базы разошлись. Никто не узнал.

Почему очевидные решения не работают

Первая реакция разработчика — обернуть отправку в try-catch и повторить при ошибке:

			accountRepository.updateBalance(accountId, newAmount); // транзакция закоммичена

try {

    kafkaTemplate.send("balance.changed", event).get();

} catch (Exception e) {

    // повторим позже? но транзакция уже закоммичена

}
		

Не работает: транзакция уже закоммичена до отправки. При сбое Kafka данные обновлены, событие потеряно. Retry помогает только если Kafka временно недоступна — но не при падении самого сервиса между коммитом и отправкой.

Вторая идея — отправить событие до коммита:

			kafkaTemplate.send("balance.changed", event).get(); // отправили

accountRepository.updateBalance(accountId, newAmount); // закоммитили
		

Тоже не работает: если коммит упадёт, Kafka уже получила событие об изменении, которого нет в базе. Данные снова разошлись, но в другую сторону.

Третья идея — двухфазный коммит (2PC). Это протокол, который координирует атомарный коммит сразу в нескольких системах. Звучит как решение, но на практике Kafka его не поддерживает в классическом смысле, а реализации 2PC с брокерами сообщений крайне сложны в эксплуатации и плохо переносят сбои координатора.

Все три пути упираются в одну проблему: **невозможно атомарно закоммитить транзакцию в базе и отправить сообщение в Kafka** — это две разные системы без общего менеджера транзакций.

Принцип решения

Раз нельзя сделать две операции атомарными, нужно свести их к одной. Именно это делает паттерн **Transactional Outbox**.

Вместо того чтобы отправлять событие в Kafka напрямую, мы записываем его в таблицу `outbox` — в той же базе, в той же транзакции, что и изменение остатка. Одна транзакция, одна база, полная атомарность за счёт ACID.

			BEGIN;

  UPDATE accounts SET amount = 500 WHERE account_id = 42;

  INSERT INTO outbox (event_type, payload) VALUES ('BalanceChanged', '...');

COMMIT;  -- либо оба изменения, либо ни одного
		

Доставкой события из `outbox` в Kafka занимается отдельный процесс — уже без транзакционных рисков, с возможностью повторных попыток.

Это и есть весь паттерн. Дальше — вопрос реализации.

Два способа реализации

Есть два принципиально разных подхода к тому, как читать `outbox` и доставлять события в Kafka. Выбор между ними определяет операционную сложность и задержку доставки.

Способ 1: Polling Relay

Фоновый воркер периодически опрашивает 'outbox' и отправляет неотправленные события в Kafka.

Структура таблицы outbox:

			CREATE TABLE outbox (

    id          UUID PRIMARY KEY DEFAULT gen_random_uuid(),

    event_type  VARCHAR(100)  NOT NULL,

    payload     JSONB         NOT NULL,

    created_at  TIMESTAMP     NOT NULL DEFAULT now(),

    sent_at     TIMESTAMP     NULL

);
		

Бизнес-транзакция:

			@Transactional

    accountRepository.updateBalance(accountId, newAmount);

    String payload;

    try {

        payload = objectMapper.writeValueAsString(Map.of(

            "accountId", accountId,

            "changedAt", Instant.now()

        ));

    } catch (JsonProcessingException e) {

        throw new RuntimeException("Failed to serialize outbox payload", e);

    }

    outboxRepository.save(OutboxEvent.builder()

        .eventType("BalanceChanged")

        .payload(payload)

        .build());

    // @Transactional закоммитит UPDATE и INSERT атомарно

}
		

Relay-процесс:

При нескольких экземплярах ретранслятора важно, чтобы одно событие забирал только один из них. Используем `FOR UPDATE SKIP LOCKED`:

			SELECT * FROM outbox

WHERE sent_at IS NULL

ORDER BY created_at

LIMIT 10

FOR UPDATE SKIP LOCKED;
		
			@Scheduled(fixedDelay = 500) // задержка доставки — до 500мс; уменьшайте если нужно быстрее

@Transactional

public void relayEvents() {

    List<OutboxEvent> events = outboxRepository.fetchUnsentBatch(10);

    for (OutboxEvent event : events) {

        try {

            // .get() блокирует до ack от Kafka.

            // Без него send() возвращает Future немедленно —

            // при сбое брокера исключение не будет поймано и событие потеряется.

            kafkaTemplate.send("balance.events", event.getPayload()).get();

            event.markAsSent();

            outboxRepository.save(event);

        } catch (Exception e) {

            log.error("Failed to relay event {}: {}", event.getId(), e.getMessage());

            // sent_at остаётся NULL → повтор в следующем цикле

        }

    }

}
		

Когда выбирать: задержка доставки в сотни миллисекунд допустима, хочется минимум зависимостей, команда не готова к операционной сложности CDC.

Способ 2: CDC через Debezium

CDC (Change Data Capture) — подход, при котором события читаются не опросом таблицы, а напрямую из журнала транзакций базы данных (WAL в PostgreSQL). [Debezium](https://debezium.io/) подключается к PostgreSQL как репликационный слот и стримит каждое изменение в `outbox` в Kafka в реальном времени.

Relay-процесс при этом не нужен вообще.

Бизнес-транзакция остаётся той же — пишем в `accounts` и `outbox` атомарно. Debezium сам следит за WAL и публикует новые записи в Kafka с минимальной задержкой.

Когда выбирать: нужна задержка доставки в миллисекунды, команда готова к Debezium + Kafka Connect в инфраструктуре.

Что выбрать для синхронизации остатков

Для большинства задач с остатками по счетам polling relay достаточен. CDC имеет смысл, если бизнес требует обновления клиентского баланса быстрее чем за секунду.

Получатель: защита от дублей

Оба способа доставляют события как минимум один раз — при повторных попытках одно событие может прийти дважды. На стороне Остатков по клиентам нужна защита.

Самый надёжный способ — inbox-таблица с уникальным constraint:

			CREATE TABLE inbox (

    event_id    UUID PRIMARY KEY,

    received_at TIMESTAMP NOT NULL DEFAULT now()

);
		
			@KafkaListener(topics = "balance.events")

public void handleBalanceChanged(BalanceChangedEvent event) {

    try {

        // Резервируем event_id первым.

        // Уникальный constraint не даст двум параллельным

        // консюмерам обработать одно событие дважды.

        inboxRepository.save(new InboxRecord(event.getEventId()));

    } catch (DataIntegrityViolationException e) {

        log.info("Duplicate event {}, skipping", event.getEventId());

        return;

    }

    aggregationService.applyBalanceChange(event);

}
		

>Почему не `existsById` перед вставкой? При параллельной обработке два консюмера могут одновременно получить `false` и оба начать обработку. Уникальный constraint на уровне БД закрывает эту гонку.

Что сломается при небрежной реализации

Relay внутри API-сервиса. Если ретранслятор живёт в том же процессе, что и API, горизонтальное масштабирование до 10–30 реплик создаёт столько же потоков, опрашивающих `outbox`. База тратит CPU не на запросы, а на управление блокировками. Relay — отдельный деплоймент, который масштабируется по объёму событий, а не по HTTP-трафику.

Таблица outbox незаметно растёт. Без очистки запросы замедляются, индексы раздуваются. Настройте автоматическую очистку:

			DELETE FROM outbox

  AND sent_at < now() - INTERVAL '24 hours';

DELETE FROM inbox

WHERE received_at < now() - INTERVAL '6 hours';
		

Отсутствие мониторинга outbox. Outbox — скрытая очередь внутри базы. Если relay начнёт отставать, вы узнаете об этом от клиентов, а не от алертов. Следите за количеством и возрастом необработанных событий — это главные сигналы проблемы.

Итог

Проблема потери событий при синхронизации баз — не баг конкретной реализации. Это фундаментальное ограничение: нельзя атомарно закоммитить транзакцию в базе и отправить сообщение в Kafka.

Transactional Outbox обходит это ограничение, сводя две операции к одной: событие пишется в `outbox` в той же транзакции, что и основные данные. Дальше — дело техники: polling relay, если нужна простота, CDC, если нужна скорость.

Остатки по счетам и клиентам будут согласованы — даже если между сервисами что-то пойдёт не так.