Python CQRS: Пишем распределенные системы без боли (Sagas, Outbox, Event Driven)
Устали писать велосипеды для согласованности данных в микросервисах? Когда простой запрос превращается в квест с Transactional Outbox и Сагами, код быстро становится неподдерживаемым. В этом посте техлид Timeweb Cloud представляет python-cqrs — open-source фреймворк, который берет на себя всю сложность Event-Driven архитектуры. Внутри: — Паттерн Mediator для максимально тонкого презентационного слоя (FastAPI/CLI). — Оркестрируемые Саги с автоматическим откатом (Compensating Transactions) и сохранением состояния. — Честный Transactional Outbox для гарантии доставки событий. — Streaming-хендлеры и полная типизация на Pydantic v2. Разбираем, как перестать бояться eventual consistency и начать писать чистый бизнес-код.
57 открытий2К показов
Говорят, что распределенная система — это когда вы не можете работать, потому что упал сервер, о существовании которого вы даже не подозревали. А если этот сервер ещё и успел списать деньги с карты пользователя, но “забыл” сообщить об этом складу — добро пожаловать в увлекательный мир согласованности данных, где COMMIT больше не гарантирует вам спокойный сон.
Всем привет! Меня зовут Вадим, я техлид команды финансовой инфраструктуры в Timeweb Cloud. Сегодня я хочу рассказать об инструменте, который лежит в основе наших Python-микросервисов. Это пилотный пост-знакомство: без долгих историй “как мы дошли до жизни такой” (большая статья с архитектурными кейсами уже готовится), а чисто по делу.
Если вы пишете сложные распределенные системы и устали каждый раз заново изобретать велосипеды для Transactional Outbox, Sagas или Mediator — этот пост сэкономит вам кучу времени.
Представляю python-cqrs — фреймворк для реализации паттернов CQRS и Event-Driven Architecture. Он вырос из форка проекта diator, но превратился в самостоятельный комбайн для построения надежных систем.
Killer Features
Честный CQS и Mediator (База)
По названию может показаться, что пакет исключительно про сложный CQRS и Event Sourcing. На самом деле, название родилось исторически: мы искали инструмент именно для ES-сервиса. Но в итоге создали универсальную реализацию паттернов Mediator и CQS (Command Query Separation).
Суть проста: мы разделяем операции изменения состояния (Команды) и чтения данных (Запросы). Это позволяет изолировать чистую бизнес-логику от инфраструктуры (HTTP, Kafka, CLI). Хендлер ничего не знает о том, как пришел запрос, он просто выполняет задачу.
Распределенные транзакции (Saga Pattern)
Библиотека реализует Оркестрированные Саги с сохранением состояния:
- Автоматическая компенсация: Если шаг падает, фреймворк автоматически запускает логику отката (compensate) в обратном порядке для всех уже выполненных шагов.
- Recovery Mechanism: Если под (pod) сервиса был убит (OOM, редеплой) посередине транзакции, при рестарте механизм восстановления вычитает состояние из SagaStorage и продолжит выполнение с прерванного места (или продолжит компенсацию).
- SagaStorage & SagaLog: Состояние саги персистентно. Каждый шаг (успех, ошибка, начало компенсации) пишется в SagaLog. Это позволяет не терять контекст транзакции даже при падении всего кластера.
- Конкурентность: В SagaMediator встроена защита от гонок (race conditions). Даже если несколько инстансов сервиса попытаются обработать события одной саги одновременно, механизм блокировок (на уровне хранилища) гарантирует последовательное выполнение шагов.
Transactional Outbox
Классическая проблема “Dual Write”: мы сохранили сущность в БД, но упали перед отправкой события в Kafka. В итоге — рассинхрон. python-cqrs реализует Transactional Outbox, гарантируя, что события сохраняются в той же транзакции БД, что и бизнес-данные.
Streaming Request Handlers
Нужно обработать большой файл или вернуть прогресс-бар пользователю? StreamingRequestMediator позволяет отдавать данные по кускам (yield), идеально ложась на Server-Sent Events (SSE) в FastAPI.
Разделение Domain Events и Notification Events
В python-cqrs мы четко разделяем события на два типа. Это помогает не смешивать внутреннюю логику модуля с публичным контрактом сервиса.
Domain Event (Внутренние)
Это события, которые происходят внутри вашего bounded context. Они нужны, чтобы снизить зацепление (coupling) между компонентами одной системы.
- Как работают: Хендлер возвращает их через свойство events. Mediator перехватывает их и тут же (или в фоне) рассылает локальным подписчикам.
- Пример: “Заказ создан” -> (внутри того же сервиса) -> “Отправить email”, “Обновить аналитику”.
- ⚠️ Важно: Этот механизм работает In-Memory и не дает гарантий доставки (at-most-once). Если процесс упадет во время обработки доменного события, оно (и его сайд-эффекты) будет потеряно.
Notification Events (Публичные / Интеграционные)
Это события, которые улетают наружу — в Kafka/RabbitMQ для других микросервисов.
- Как работают: Это обертки (NotificationEvent), которые содержат топик и пейлоад. Именно их мы сохраняем в Transactional Outbox, чтобы гарантировать доставку.
- EventMediator: Для их обработки на стороне потребителя (Consumer) есть отдельный EventMediator, который умеет маршрутизировать входящие сообщения из брокера.
Почему это круто? Вы можете менять внутреннюю структуру доменных событий как угодно, не ломая контракты с внешними сервисами. Публичный API (Notification Events) стабилен, внутренняя кухня (Domain Events) гибка.
Бесшовная интеграция (FastAPI & FastStream)
python-cqrs проектировался так, чтобы быть “клеем” между вашей бизнес-логикой и внешним миром. Интеграция происходит максимально нативно через Dependency Injection и встроенные bootstrap-утилиты.
FastAPI (HTTP API)
Вам не нужно вручную собирать медиатор. Используйте cqrs.requests.bootstrap для фабрики и fastapi.Depends для внедрения.
FastStream (Kafka Consumer)
Для обработки входящих Notification-событий из Kafka используем cqrs.events.bootstrap для создания EventMediator.
Итог: Ваш Presentation Layer занимается только транспортом. Вся конфигурация собрана в bootstrap, а логика — в Handlers.
Другие фичи
- Modern Python: Полная типизация, поддержка Pydantic v2.
- Dependency Injection: “First-class citizen” поддержка популярных DI-фреймворков. Есть готовые интеграции как для легковесного di, так и для мощного dependency-injector. Вы просто объявляете зависимости в init хендлера, и они “прилетают” автоматически.
- Event-Driven & Protobuf:
– Нативная поддержка Kafka (используем aiokafka под капотом, отлично дружит с faststream).
– Поддержка Protobuf для Notification-событий “из коробки” — критично для высоконагруженных систем, где важен размер сообщения. - Parallel Event Processing: Хотите обрабатывать 10 доменных событий одной команды параллельно, но не положить базу? Просто настройте max_concurrent_event_handlers в медиаторе.
- Chain of Responsibility: Цепочки обработчиков с возможностью построения диаграмм.
- Mermaid Visualizations: Автоматическая генерация диаграмм для Saga и Chain of Responsibility. Планируем расширить это на все типы обработчиков, чтобы документация всегда соответствовала коду.
- Bootstrap Tools: Набор утилит для быстрой настройки Presentation Layer (FastAPI, CLI). Не нужно писать тонны бойлерплейта, чтобы собрать медиатор, мапперы и DI-контейнер воедино — есть удобные bootstrap-функции.
Планы на будущее (Roadmap)
Мы активно работаем над тем, чтобы библиотека следовала всем стандартам для HighLoad на Python. Вот ближайшие наши планы
Архитектура и Гибкость
- Отвязка от Pydantic: Сделать ядро библиотеки агностиком к моделям данных. Поддержка dataclasses, attrs, msgspec или простых TypedDict для тех, кому важна максимальная производительность и минимум зависимостей.
Observability
- OpenTelemetry: Автоматический проброс trace_id сквозь цепочку Command → Saga → Event.
- Метрики: Middleware для сбора Prometheus-метрик (длительность хендлеров, счетчики ошибок, лаг саг).
Resiliency
- Idempotency Middleware: Механизм дедупликации сообщений (Redis/DB) для безопасной обработки “at-least-once”.
- Circuit Breaker: Защита от каскадных сбоев внешних сервисов.
- Unified Transport: Единый интерфейс для переключения между RabbitMQ, Redis Streams и NATS.
Developer Experience
- Testing Harness: Удобный API для написания интеграционных тестов бизнес-флоу.
- In-Memory Transport: Возможность прогонять тесты “Команда -> Событие -> Сага” в памяти без поднятия Docker-контейнеров.
Что дальше?
Как я и сказал в начале, это лишь обзорный “выстрел в воздух”, чтобы найти единомышленников и собрать первый фидбек. В следующей большой статье я планирую рассказать:
- Как мы переписывали монолит на микросервисы и почему стандартные подходы не сработали.
- Реальные кейсы использования TransactionOutbox и Саг в продакшене (где это спасло деньги).
- Почему мы отказались от готовых решений и начали пилить своё.
А пока — буду рад любым issues, звездам на GitHub и конструктивной критике в комментариях. Если вы пишете микросервисы на Python, используете Outbox-ы, CQRS или Саги в Python — расскажите, какие боли испытываете вы? Возможно, мы сможем решить их вместе.
Ссылки
1. GitHub
2. PyPi
3. Документация
4. DeepWiki
57 открытий2К показов











