Axon — это один из немногих фреймворков, который позволяет создавать гибкие микросервисные решения, используя архитектурные паттерны Event Sourcing и CQRS.
Подход Event Sourcing сильно отличается от обычного метода создания приложений, когда для сохранения, изменения и выборки состояния объектов используется одно и тоже хранилище в базе данных.
В Event Sourcing каждое действие, связанное с состоянием объекта, т.е. создание, изменение или запросы на получение, связано с событиями (Event). Для представления состояния используется агрегат (Aggregate). Для его изменения требуется создать определённое событие, которое несёт ограниченную логику изменения конкретной части агрегата. При этом сохраняются только события, а не сам агрегат и его состояние.
Шаблон CQRS (Command Query Responsibility Segregation) представляет возможность разделения операций, обновляющих состояние, от операций, запрашивающих состояние.
В этой статье мы создадим простое приложение на примере библиотеки книг с использованием Spring Boot и Axon Framework.
Содержание:
- Axon Server и конфигурация Spring Boot-проекта
- Модель команд, событий и запросов
- Создание агрегационной модели
- Проекция агрегата
- Обработка запросов и событий
- Создание REST-контроллера
- Итоги
Шаг первый. Axon Server и конфигурация Spring Boot-проекта
Для хранилища событий, маршрутизации команд, событий и запросов Axon использует собственный Server. Он позволяет разным приложениям на основе Axon Framework общаться друг с другом, передавая различные типы сообщений из одной службы в другую для выполнения каких-либо действий. Axon Server можно запустить через docker:
docker run -p 8124:8124 axoniq/axonserver
Или скачать jar через официальный сайт.
Для быстрого формирования проекта можно использовать Spring Initializr. В качестве зависимостей добавляем:
- Spring Web
- Spring Data JPA
- H2 Database
- Lombok
Скачиваем сгенерированный проект. Открываем его через удобную IDE, в pom.xml
добавляем следующую зависимость для автоматической конфигурации Axon Framework:
<dependencies>
...
<dependency>
<groupId>org.axonframework</groupId>
<artifactId>axon-spring-boot-starter</artifactId>
<version>4.5</version>
</dependency>
</dependencies>
Подробнее о работе со Spring Initializr можно узнать из статьи про Spring Boot.
Шаг второй. Модель команд, событий и запросов
Начнём описание модели с описания команд. Команды предназначены для передачи инструкций изменения состояния агрегата и представляются простыми объектами, которые содержат все данные для обработки и выполнения команды. Класс именуется в повелительном наклонении, представляя собой цель, которую требуется выполнить. Для нашего приложения нужны будут следующие возможности:
- регистрация (добавление) книги в библиотеку,
- аренда книги,
- возврат книги.
Для каждой из возможностей создадим соответствующие классы представления команды. В каждый класс добавим поля, необходимые для выполнения команды. Для регистрации нужна информация о книге и количество её экземпляров. Для аренды и возврата добавляем имя арендатора. Для идентификации команд в агрегационной модели используем аннотацию @TargetAggregateIdentifier
над идентификатором книги:
package ru.tproger.axondemo.domain.commands;
import lombok.Data;
import org.axonframework.modelling.command.TargetAggregateIdentifier;
//команда регистрации книги
@Data
public class RegisterBookCommand {
@TargetAggregateIdentifier
private final String bookId;
private final String title;
private final String description;
private final Integer amount;
}
package ru.tproger.axondemo.domain.commands;
import lombok.Data;
import org.axonframework.modelling.command.TargetAggregateIdentifier;
//команда аренды книги
@Data
public class BorrowBookCommand {
@TargetAggregateIdentifier
private final String bookId;
private final String fullName;
}
package ru.tproger.axondemo.domain.commands;
import lombok.Data;
import org.axonframework.modelling.command.TargetAggregateIdentifier;
//команда возврата книги
@Data
public class ReturnBookCommand {
@TargetAggregateIdentifier
private final String bookId;
private final String fullName;
}
Перейдём к созданию событий. События именуются в прошедшем времени и сообщают информацию об изменении агрегата всем заинтересованным подписчикам. Аналогично создадим соответствующие классы для регистрации, аренды и возврата книги:
package ru.tproger.axondemo.domain.events;
import lombok.Data;
//событие регистрации книги
@Data
public class BookRegisteredEvent {
private final String bookId;
private final String title;
private final String description;
private final Integer amount;
}
package ru.tproger.axondemo.domain.events;
import lombok.Data;
//событие аренды книги
@Data
public class BookBorrowedEvent {
private final String bookId;
private final String fullName;
}
package ru.tproger.axondemo.domain.events;
import lombok.Data;
//событие возврата книги
@Data
public class BookReturnedEvent {
private final String bookId;
private final String fullName;
}
В запросах передаются инструкции для извлечения состояния агрегатов. Наименование класса отражает его предназначение. Используем поля в качестве критериев выборки нужных данных. Добавим запрос для выборки книги по bookId
и запрос для поиска всех книг с фильтром по наименованию:
package ru.tproger.axondemo.domain.queries;
import lombok.Data;
//запрос на выборку книги по bookId
@Data
public class BookQuery {
private final String bookId;
}
package ru.tproger.axondemo.domain.queries;
import lombok.Data;
//запрос на выборку книг с фильтром по title
@Data
public class ListBookQuery {
private final String title;
}
Шаг третий. Агрегат
Переходим к созданию агрегационной модели. Для этого добавим класс Book с аннотацией @Aggregate
, с помощью которой Axon узнаёт, что класс представляет собой агрегат. Также добавляем поле bookId
с аннотацией @AggregateIdentifier
для идентификации агрегата:
package ru.tproger.axondemo.domain.model;
import org.axonframework.modelling.command.AggregateIdentifier;
import org.axonframework.spring.stereotype.Aggregate;
@Aggregate
public class Book {
@AggregateIdentifier
private String bookId;
...
}
Для обеспечения состояния агрегата добавим поля, отражающие количество доступных экземпляров книги и текущих арендаторов книги:
…
@Aggregate
public class Book {
@AggregateIdentifier
private String bookId;
private Integer amount;
private Set<String> tenants;
...
}
Для каждой из ранее созданных команд в агрегат необходимо добавить обработчик (Command Handler). Самой первой командой является регистрация книги, которая будет определять создание агрегата. Добавляем конструктор с соответствующим аргументом экземпляра команды. Над конструктором добавляем аннотацию @CommandHandler
:
…
@org.axonframework.commandhandling.CommandHandler
public Book(RegisterBookCommand command) {
log.info("Handling RegisterBookCommand: {}", command);
}
...
Экземпляр данного класса будет создаваться при регистрации новой книги. Внутри обработчика можно добавить логику для проверки корректности команды, например, валидацию входных данных или авторизацию. Добавим валидацию, что количество книг в команде положительное:
…
@CommandHandler
public Book(RegisterBookCommand command) {
log.info("Handling RegisterBookCommand: {}", command);
if (command.getAmount() <= 0)
throw new IllegalArgumentException("Amount must be positive");
}
...
Следует отметить, что обработчик команды не должен менять состояние агрегата. Аналогично создаём обработчики для аренды книги и для её возврата, но уже используем методы вместо конструктора, т.к. теперь мы работаем с экземпляром агрегационной модели. Каждый раз, когда будет создаваться команда, она будет обрабатываться в экземпляре агрегата, имеющим соответствующий идентификатор bookId
:
…
@CommandHandler
public void handle(BorrowBookCommand command) {
log.info("Handling BorrowBookCommand: {}", command);
if (amount < 0)
throw new IllegalArgumentException("Book out of stock");
if (tenants.contains(command.getFullName()))
throw new IllegalArgumentException("Book already borrowed by this person");
}
@CommandHandler
public void handle(ReturnBookCommand command) {
log.info("Handling ReturnBookCommand: {}", command);
if (!tenants.contains(command.getFullName()))
throw new IllegalArgumentException("Book must be returned by person who has borrowed it");
}
...
Для обработки ранее созданных событий добавляем обработчики событий с помощью методов с аргументом экземпляра соответствующего события. Каждый из методов отмечаем аннотацией @EventSourcingHandler
. Axon с её помощью сопровождает изменение состояния агрегата в соответствии с паттерном Event Sourcing. В первом обработчике события (регистрация книги) необходимо инициализировать состояние агрегата. При аренде книги меняем состояние путем добавления арендатора и уменьшения количества экземпляров книги. При возврате проводим обратные операции:
...
@org.axonframework.eventsourcing.EventSourcingHandler
public void on(BookRegisteredEvent event) {
log.info("Applying BookRegisteredEvent: {}", event);
bookId = event.getBookId();
amount = event.getAmount();
tenants = new HashSet<>();
}
@org.axonframework.eventsourcing.EventSourcingHandler
public void on(BookBorrowedEvent event) {
log.info("Applying BookBorrowedEvent: {}", event);
amount--;
tenants.add(event.getFullName());
}
@org.axonframework.eventsourcing.EventSourcingHandler
public void on(BookReturnedEvent event) {
log.info("Applying BookReturnedEvent: {}", event);
amount++;
tenants.remove(event.getFullName());
}
...
Вернёмся к обработчикам команд и добавим вызов событий для обработки. Для этого Axon представляет класс org.axonframework.modelling.command.AggregateLifecycle
со статическим методом apply(Object payload)
для публикации сгенерированных событий:
...
@CommandHandler
public Book(RegisterBookCommand command) {
... //command validation
apply(new BookRegisteredEvent(command.getBookId(), command.getTitle(), command.getDescription(), command.getAmount()));
}
@CommandHandler
public void handle(BorrowBookCommand command) {
... //command validation
apply(new BookBorrowedEvent(command.getBookId(), command.getFullName()));
}
@CommandHandler
public void handle(ReturnBookCommand command) {
... //command validation
apply(new BookReturnedEvent(command.getBookId(), command.getFullName()));
}
...
Завершающим шагом для работы агрегата будет добавление конструктора без аргументов. Это требуется для воссоздания состояния агрегата в @EventSourcingHandler
обработчиках при воспроизведении ранее произошедших событий.
Шаг четвертый. Проекция агрегата
По паттерну Event Sourcing состояние агрегата не хранится в базе данных. Сохранятся только события в специальном хранилище (Event Store). Это не подходит для выборки состояния агрегата при запросе данных, т.к. каждый раз потребует выполнение всех @EventSourcingHandler
обработчиков. Для разрешения этого вопроса будем использовать проекцию агрегата (Aggregate Projection) в виде дополнительного хранилища. В нём будет поддерживаться актуальная информация для чтения состояния агрегата, которое будет обновляться с помощью подписки на ранее созданные события. Добавим entity и repository для хранения проекции агрегата:
package ru.tproger.axondemo.domain.projections;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.persistence.Entity;
import javax.persistence.Id;
@Data
@AllArgsConstructor
@NoArgsConstructor(force = true)
@Entity
public class BookView {
@Id
private final String bookId;
private final String title;
private final String description;
private Integer amount;
}
package ru.tproger.axondemo.domain.projections;
import org.springframework.data.jpa.repository.JpaRepository;
import java.util.List;
public interface BookViewRepository extends JpaRepository<BookView, String> {
List<BookView> findByTitleContaining(String title);
}
Шаг пятый. Обработка запросов и событий
Реализуем обработчики запросов, созданных ранее, по аналогии с обработчиками команд и событий. Над каждым методом, принимающим объект запроса, добавим аннотацию @QueryHandler
:
package ru.tproger.axondemo.domain.queryhandlers;
import lombok.extern.slf4j.Slf4j;
import org.axonframework.queryhandling.QueryHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import ru.tproger.axondemo.domain.projections.BookView;
import ru.tproger.axondemo.domain.projections.BookViewRepository;
import ru.tproger.axondemo.domain.queries.BookQuery;
import ru.tproger.axondemo.domain.queries.ListBookQuery;
import java.util.List;
import java.util.Optional;
@Slf4j
@Component
public class BookQueryHandler {
@Autowired
private BookViewRepository bookViewRepository;
@QueryHandler
public List<BookView> handle(ListBookQuery query) {
log.info("Handling ListBookQuery: {}", query);
return bookViewRepository.findByTitleContaining(query.getTitle());
}
@QueryHandler
public BookView handle(BookQuery query) {
log.info("Handling BookQuery: {}", query);
Optional<BookView> book = bookViewRepository.findById(query.getBookId());
if (book.isEmpty())
throw new IllegalArgumentException("Book not found");
return book.get();
}
}
Для сохранения проекции агрегата необходимо добавить обработчики событий, но уже вне агрегата, не меняя его состояние. Для этого вместо аннотации @EventSourcingHandler
используем @EventHandler
:
package ru.tproger.axondemo.interfaces.events;
import lombok.extern.slf4j.Slf4j;
import org.axonframework.eventhandling.EventHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import ru.tproger.axondemo.domain.events.BookBorrowedEvent;
import ru.tproger.axondemo.domain.events.BookRegisteredEvent;
import ru.tproger.axondemo.domain.events.BookReturnedEvent;
import ru.tproger.axondemo.domain.projections.BookView;
import ru.tproger.axondemo.domain.projections.BookViewRepository;
@Slf4j
@Component
public class BookProjectionEventHandler {
@Autowired
private BookViewRepository bookViewRepository;
@EventHandler
public void bookRegisteredEventHandler(BookRegisteredEvent event) {
log.info("Applying BookRegisteredEvent: {}", event);
BookView bookView = new BookView(event.getBookId(), event.getTitle(), event.getDescription(), event.getAmount());
bookViewRepository.save(bookView);
}
@EventHandler
public void bookBorrowedEventHandler(BookBorrowedEvent event) {
log.info("Applying BookBorrowedEvent: {}", event);
BookView bookView = getBookView(event.getBookId());
bookView.setAmount(bookView.getAmount() - 1);
}
@EventHandler
public void bookReturnedEventHandler(BookReturnedEvent event) {
log.info("Applying BookReturnedEvent: {}", event);
BookView bookView = getBookView(event.getBookId());
bookView.setAmount(bookView.getAmount() + 1);
}
private BookView getBookView(String bookId) {
return bookViewRepository.findById(bookId)
.orElseThrow(() -> new IllegalArgumentException("Book not found"));
}
}
Шаг шестой. REST-контроллер
Последним этапом будет создание REST-контроллера для использования команд и событий через веб-интерфейс. Для создания команд инжектируем интерфейс CommandGateway
. Он представляет собой обертку для интерфейса шины команд CommandBus
, в качестве реализации по умолчанию используется AxonServerCommandBus
, который работает через запущенный на первом шаге Axon Server. Для обработки запросов нужно заинжектировать интерфейс QueryGateway
, использующийся в качестве обертки для интерфейса шины запросов QueryBus
, для которого по умолчанию используется реализация AxonServerQueryBus
:
package ru.tproger.axondemo.interfaces.rest;
import org.axonframework.commandhandling.gateway.CommandGateway;
import org.axonframework.messaging.responsetypes.ResponseTypes;
import org.axonframework.queryhandling.QueryGateway;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.*;
import ru.tproger.axondemo.domain.commands.BorrowBookCommand;
import ru.tproger.axondemo.domain.commands.RegisterBookCommand;
import ru.tproger.axondemo.domain.commands.ReturnBookCommand;
import ru.tproger.axondemo.domain.projections.BookView;
import ru.tproger.axondemo.domain.queries.BookQuery;
import ru.tproger.axondemo.domain.queries.ListBookQuery;
import java.util.List;
@RestController
@RequestMapping("/book")
public class BookController {
@Autowired
private CommandGateway commandGateway;
@Autowired
private QueryGateway queryGateway;
@PostMapping
@ResponseStatus(HttpStatus.NO_CONTENT)
public void registerBook(@RequestBody RegisterBookDto dto) {
commandGateway.sendAndWait(
new RegisterBookCommand(dto.getId(), dto.getTitle(), dto.getDescription(), dto.getAmount())
);
}
@PutMapping("/{bookId}/borrow")
@ResponseStatus(HttpStatus.NO_CONTENT)
public void borrowBook(@PathVariable String bookId,
@RequestBody BorrowBookDto dto) {
commandGateway.sendAndWait(
new BorrowBookCommand(bookId, dto.getFullName())
);
}
@PutMapping("/{bookId}/return")
@ResponseStatus(HttpStatus.NO_CONTENT)
public void returnBook(@PathVariable String bookId,
@RequestBody ReturnBookDto dto) {
commandGateway.sendAndWait(
new ReturnBookCommand(bookId, dto.getFullName())
);
}
@GetMapping("/{bookId}")
public BookView getBook(@PathVariable String bookId) {
return queryGateway.query(new BookQuery(bookId), BookView.class).join();
}
@GetMapping
public List<BookView> getBookList(@RequestParam(required = false, defaultValue = "") String title) {
return queryGateway
.query(new ListBookQuery(title), ResponseTypes.multipleInstancesOf(BookView.class))
.join();
}
}
Запускаем приложение как обычное Spring Boot-приложение.
Итоги
В этой статье мы познакомились с неординарным способом создания приложений с использованием паттернов Event Sourcing и CQRS. Данная связка может значительно упростить работу в сложных системах за счет обширной модели предметной области, отслеживания истории состояния, возможности аудита и аналитики из коробки, а также разделения обязанностей между разработчиками при разработке интерфейсов чтения и записи данных.