Обложка: Использование паттернов Event Sourcing и CQRS для разработки приложения на Spring Boot и Axon Framework

Использование паттернов Event Sourcing и CQRS для разработки приложения на Spring Boot и Axon Framework

Александр Макеев

Александр Макеев

Software Engineer в компании Arcadia, Team Lead в компании RusSportImport

Axon — это один из немногих фреймворков, который позволяет создавать гибкие микросервисные решения, используя архитектурные паттерны Event Sourcing и CQRS.

Подход Event Sourcing сильно отличается от обычного метода создания приложений, когда для сохранения, изменения и выборки состояния объектов используется одно и тоже хранилище в базе данных.

В Event Sourcing каждое действие, связанное с состоянием объекта, т.е. создание, изменение или запросы на получение, связано с событиями (Event). Для представления состояния используется агрегат (Aggregate). Для его изменения требуется создать определённое событие, которое несёт ограниченную логику изменения конкретной части агрегата. При этом сохраняются только события, а не сам агрегат и его состояние.

Шаблон CQRS (Command Query Responsibility Segregation) представляет возможность разделения операций, обновляющих состояние, от операций, запрашивающих состояние.

В этой статье мы создадим простое приложение на примере библиотеки книг с использованием Spring Boot и Axon Framework.

Содержание:

Шаг первый. Axon Server и конфигурация Spring Boot-проекта

Для хранилища событий, маршрутизации команд, событий и запросов Axon использует собственный Server. Он позволяет разным приложениям на основе Axon Framework общаться друг с другом, передавая различные типы сообщений из одной службы в другую для выполнения каких-либо действий. Axon Server можно запустить через docker:

docker run -p 8124:8124 axoniq/axonserver

Или скачать jar через официальный сайт.

Для быстрого формирования проекта можно использовать Spring Initializr. В качестве зависимостей добавляем:

  1. Spring Web
  2. Spring Data JPA
  3. H2 Database
  4. Lombok

Скачиваем сгенерированный проект. Открываем его через удобную IDE, в pom.xml добавляем следующую зависимость для автоматической конфигурации Axon Framework:

<dependencies>
	...
    <dependency>
    	<groupId>org.axonframework</groupId>
    	<artifactId>axon-spring-boot-starter</artifactId>
    	<version>4.5</version>
    </dependency>
</dependencies>

Подробнее о работе со Spring Initializr можно узнать из статьи про Spring Boot.

Шаг второй. Модель команд, событий и запросов

Начнём описание модели с описания команд. Команды предназначены для передачи инструкций изменения состояния агрегата и представляются простыми объектами, которые содержат все данные для обработки и выполнения команды. Класс именуется в повелительном наклонении, представляя собой цель, которую требуется выполнить. Для нашего приложения нужны будут следующие возможности:

  • регистрация (добавление) книги в библиотеку,
  • аренда книги,
  • возврат книги.

Для каждой из возможностей создадим соответствующие классы представления команды. В каждый класс добавим поля, необходимые для выполнения команды. Для регистрации нужна информация о книге и количество её экземпляров. Для аренды и возврата добавляем имя арендатора. Для идентификации команд в агрегационной модели используем аннотацию @TargetAggregateIdentifier над идентификатором книги:

package ru.tproger.axondemo.domain.commands;

import lombok.Data;
import org.axonframework.modelling.command.TargetAggregateIdentifier;

//команда регистрации книги
@Data
public class RegisterBookCommand {
   @TargetAggregateIdentifier
   private final String bookId;
   private final String title;
   private final String description;
   private final Integer amount;
}
package ru.tproger.axondemo.domain.commands;

import lombok.Data;
import org.axonframework.modelling.command.TargetAggregateIdentifier;

//команда аренды книги
@Data
public class BorrowBookCommand {
   @TargetAggregateIdentifier
   private final String bookId;
   private final String fullName;
}
package ru.tproger.axondemo.domain.commands;

import lombok.Data;
import org.axonframework.modelling.command.TargetAggregateIdentifier;

//команда возврата книги
@Data
public class ReturnBookCommand {
   @TargetAggregateIdentifier
   private final String bookId;
   private final String fullName;
}

Перейдём к созданию событий. События именуются в прошедшем времени и сообщают информацию об изменении агрегата всем заинтересованным подписчикам. Аналогично создадим соответствующие классы для регистрации, аренды и возврата книги:

package ru.tproger.axondemo.domain.events;

import lombok.Data;

//событие регистрации книги
@Data
public class BookRegisteredEvent {
   private final String bookId;
   private final String title;
   private final String description;
   private final Integer amount;
}
package ru.tproger.axondemo.domain.events;

import lombok.Data;

//событие аренды книги
@Data
public class BookBorrowedEvent {
   private final String bookId;
   private final String fullName;
}
package ru.tproger.axondemo.domain.events;

import lombok.Data;

//событие возврата книги
@Data
public class BookReturnedEvent {
   private final String bookId;
   private final String fullName;
}

В запросах передаются инструкции для извлечения состояния агрегатов. Наименование класса отражает его предназначение. Используем поля в качестве критериев выборки нужных данных. Добавим запрос для выборки книги по bookId и запрос для поиска всех книг с фильтром по наименованию:

package ru.tproger.axondemo.domain.queries;

import lombok.Data;

//запрос на выборку книги по bookId
@Data
public class BookQuery {
   private final String bookId;
}
package ru.tproger.axondemo.domain.queries;

import lombok.Data;

//запрос на выборку книг с фильтром по title
@Data
public class ListBookQuery {
   private final String title;
}

Шаг третий. Агрегат

Переходим к созданию агрегационной модели. Для этого добавим класс Book с аннотацией @Aggregate, с помощью которой Axon узнаёт, что класс представляет собой агрегат. Также добавляем поле bookId с аннотацией @AggregateIdentifier для идентификации агрегата:

package ru.tproger.axondemo.domain.model;

import org.axonframework.modelling.command.AggregateIdentifier;
import org.axonframework.spring.stereotype.Aggregate;

@Aggregate
public class Book {
   @AggregateIdentifier
   private String bookId;
   ...
}

Для обеспечения состояния агрегата добавим поля, отражающие количество доступных экземпляров книги и текущих арендаторов книги:

…
@Aggregate
public class Book {
   @AggregateIdentifier
   private String bookId;
   private Integer amount;
   private Set<String> tenants;
   ...
}

Для каждой из ранее созданных команд в агрегат необходимо добавить обработчик (Command Handler). Самой первой командой является регистрация книги, которая будет определять создание агрегата. Добавляем конструктор с соответствующим аргументом экземпляра команды. Над конструктором добавляем аннотацию @CommandHandler:

…
@org.axonframework.commandhandling.CommandHandler
public Book(RegisterBookCommand command) {
   log.info("Handling RegisterBookCommand: {}", command);
}
...

Экземпляр данного класса будет создаваться при регистрации новой книги. Внутри обработчика можно добавить логику для проверки корректности команды, например, валидацию входных данных или авторизацию. Добавим валидацию, что количество книг в команде положительное:

…
@CommandHandler
public Book(RegisterBookCommand command) {
   log.info("Handling RegisterBookCommand: {}", command);

   if (command.getAmount() <= 0)
       throw new IllegalArgumentException("Amount must be positive");
}
...

Следует отметить, что обработчик команды не должен менять состояние агрегата. Аналогично создаём обработчики для аренды книги и для её возврата, но уже используем методы вместо конструктора, т.к. теперь мы работаем с экземпляром агрегационной модели. Каждый раз, когда будет создаваться команда, она будет обрабатываться в экземпляре агрегата, имеющим соответствующий идентификатор bookId:

…
@CommandHandler
public void handle(BorrowBookCommand command) {
   log.info("Handling BorrowBookCommand: {}", command);

   if (amount < 0)
       throw new IllegalArgumentException("Book out of stock");
   if (tenants.contains(command.getFullName()))
       throw new IllegalArgumentException("Book already borrowed by this person");
}

@CommandHandler
public void handle(ReturnBookCommand command) {
   log.info("Handling ReturnBookCommand: {}", command);

   if (!tenants.contains(command.getFullName()))
       throw new IllegalArgumentException("Book must be returned by person who has borrowed it");
}
...

Для обработки ранее созданных событий добавляем обработчики событий с помощью методов с аргументом экземпляра соответствующего события. Каждый из методов отмечаем аннотацией @EventSourcingHandler. Axon с её помощью сопровождает изменение состояния агрегата в соответствии с паттерном Event Sourcing. В первом обработчике события (регистрация книги) необходимо инициализировать состояние агрегата. При аренде книги меняем состояние путем добавления арендатора и уменьшения количества экземпляров книги. При возврате проводим обратные операции:

...
@org.axonframework.eventsourcing.EventSourcingHandler
public void on(BookRegisteredEvent event) {
   log.info("Applying BookRegisteredEvent: {}", event);

   bookId = event.getBookId();
   amount = event.getAmount();
   tenants = new HashSet<>();
}

@org.axonframework.eventsourcing.EventSourcingHandler
public void on(BookBorrowedEvent event) {
   log.info("Applying BookBorrowedEvent: {}", event);

   amount--;
   tenants.add(event.getFullName());
}

@org.axonframework.eventsourcing.EventSourcingHandler
public void on(BookReturnedEvent event) {
   log.info("Applying BookReturnedEvent: {}", event);

   amount++;
   tenants.remove(event.getFullName());
}
...

Вернёмся к обработчикам команд и добавим вызов событий для обработки. Для этого Axon представляет класс org.axonframework.modelling.command.AggregateLifecycle со статическим методом apply(Object payload) для публикации сгенерированных событий:

...
@CommandHandler
public Book(RegisterBookCommand command) {
   ... //command validation
   apply(new BookRegisteredEvent(command.getBookId(), command.getTitle(), command.getDescription(), command.getAmount()));
}

@CommandHandler
public void handle(BorrowBookCommand command) {
   ... //command validation
   apply(new BookBorrowedEvent(command.getBookId(), command.getFullName()));
}

@CommandHandler
public void handle(ReturnBookCommand command) {
   ... //command validation
   apply(new BookReturnedEvent(command.getBookId(), command.getFullName()));
}
...

Завершающим шагом для работы агрегата будет добавление конструктора без аргументов. Это требуется для воссоздания состояния агрегата в @EventSourcingHandler обработчиках при воспроизведении ранее произошедших событий.

Шаг четвертый. Проекция агрегата

По паттерну Event Sourcing состояние агрегата не хранится в базе данных. Сохранятся только события в специальном хранилище (Event Store). Это не подходит для выборки состояния агрегата при запросе данных, т.к. каждый раз потребует выполнение всех @EventSourcingHandler обработчиков. Для разрешения этого вопроса будем использовать проекцию агрегата (Aggregate Projection) в виде дополнительного хранилища. В нём будет поддерживаться актуальная информация для чтения состояния агрегата, которое будет обновляться с помощью подписки на ранее созданные события. Добавим entity и repository для хранения проекции агрегата:

package ru.tproger.axondemo.domain.projections;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import javax.persistence.Entity;
import javax.persistence.Id;

@Data
@AllArgsConstructor
@NoArgsConstructor(force = true)
@Entity
public class BookView {
   @Id
   private final String bookId;
   private final String title;
   private final String description;
   private Integer amount;
}
package ru.tproger.axondemo.domain.projections;

import org.springframework.data.jpa.repository.JpaRepository;

import java.util.List;

public interface BookViewRepository extends JpaRepository<BookView, String> {
   List<BookView> findByTitleContaining(String title);
}

Шаг пятый. Обработка запросов и событий

Реализуем обработчики запросов, созданных ранее, по аналогии с обработчиками команд и событий. Над каждым методом, принимающим объект запроса, добавим аннотацию @QueryHandler:

package ru.tproger.axondemo.domain.queryhandlers;

import lombok.extern.slf4j.Slf4j;
import org.axonframework.queryhandling.QueryHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import ru.tproger.axondemo.domain.projections.BookView;
import ru.tproger.axondemo.domain.projections.BookViewRepository;
import ru.tproger.axondemo.domain.queries.BookQuery;
import ru.tproger.axondemo.domain.queries.ListBookQuery;

import java.util.List;
import java.util.Optional;

@Slf4j
@Component
public class BookQueryHandler {

   @Autowired
   private BookViewRepository bookViewRepository;

   @QueryHandler
   public List<BookView> handle(ListBookQuery query) {
       log.info("Handling ListBookQuery: {}", query);

       return bookViewRepository.findByTitleContaining(query.getTitle());
   }

   @QueryHandler
   public BookView handle(BookQuery query) {
       log.info("Handling BookQuery: {}", query);

       Optional<BookView> book = bookViewRepository.findById(query.getBookId());
       if (book.isEmpty())
           throw new IllegalArgumentException("Book not found");

       return book.get();
   }
}

Для сохранения проекции агрегата необходимо добавить обработчики событий, но уже вне агрегата, не меняя его состояние. Для этого вместо аннотации @EventSourcingHandler используем @EventHandler:

package ru.tproger.axondemo.interfaces.events;

import lombok.extern.slf4j.Slf4j;
import org.axonframework.eventhandling.EventHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import ru.tproger.axondemo.domain.events.BookBorrowedEvent;
import ru.tproger.axondemo.domain.events.BookRegisteredEvent;
import ru.tproger.axondemo.domain.events.BookReturnedEvent;
import ru.tproger.axondemo.domain.projections.BookView;
import ru.tproger.axondemo.domain.projections.BookViewRepository;

@Slf4j
@Component
public class BookProjectionEventHandler {

   @Autowired
   private BookViewRepository bookViewRepository;

   @EventHandler
   public void bookRegisteredEventHandler(BookRegisteredEvent event) {
       log.info("Applying BookRegisteredEvent: {}", event);

       BookView bookView = new BookView(event.getBookId(), event.getTitle(), event.getDescription(), event.getAmount());
       bookViewRepository.save(bookView);
   }

   @EventHandler
   public void bookBorrowedEventHandler(BookBorrowedEvent event) {
       log.info("Applying BookBorrowedEvent: {}", event);

       BookView bookView = getBookView(event.getBookId());
       bookView.setAmount(bookView.getAmount() - 1);
   }

   @EventHandler
   public void bookReturnedEventHandler(BookReturnedEvent event) {
       log.info("Applying BookReturnedEvent: {}", event);

       BookView bookView = getBookView(event.getBookId());
       bookView.setAmount(bookView.getAmount() + 1);
   }

   private BookView getBookView(String bookId) {
       return bookViewRepository.findById(bookId)
               .orElseThrow(() -> new IllegalArgumentException("Book not found"));
   }
}

Шаг шестой. REST-контроллер

Последним этапом будет создание REST-контроллера для использования команд и событий через веб-интерфейс. Для создания команд инжектируем интерфейс CommandGateway. Он представляет собой обертку для интерфейса шины команд CommandBus, в качестве реализации по умолчанию используется AxonServerCommandBus, который работает через запущенный на первом шаге Axon Server. Для обработки запросов нужно заинжектировать интерфейс QueryGateway, использующийся в качестве обертки для интерфейса шины запросов QueryBus, для которого по умолчанию используется реализация AxonServerQueryBus:

package ru.tproger.axondemo.interfaces.rest;

import org.axonframework.commandhandling.gateway.CommandGateway;
import org.axonframework.messaging.responsetypes.ResponseTypes;
import org.axonframework.queryhandling.QueryGateway;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.*;
import ru.tproger.axondemo.domain.commands.BorrowBookCommand;
import ru.tproger.axondemo.domain.commands.RegisterBookCommand;
import ru.tproger.axondemo.domain.commands.ReturnBookCommand;
import ru.tproger.axondemo.domain.projections.BookView;
import ru.tproger.axondemo.domain.queries.BookQuery;
import ru.tproger.axondemo.domain.queries.ListBookQuery;

import java.util.List;

@RestController
@RequestMapping("/book")
public class BookController {

   @Autowired
   private CommandGateway commandGateway;
   @Autowired
   private QueryGateway queryGateway;

   @PostMapping
   @ResponseStatus(HttpStatus.NO_CONTENT)
   public void registerBook(@RequestBody RegisterBookDto dto) {
       commandGateway.sendAndWait(
               new RegisterBookCommand(dto.getId(), dto.getTitle(), dto.getDescription(), dto.getAmount())
       );
   }

   @PutMapping("/{bookId}/borrow")
   @ResponseStatus(HttpStatus.NO_CONTENT)
   public void borrowBook(@PathVariable String bookId,
                          @RequestBody BorrowBookDto dto) {
       commandGateway.sendAndWait(
               new BorrowBookCommand(bookId, dto.getFullName())
       );
   }

   @PutMapping("/{bookId}/return")
   @ResponseStatus(HttpStatus.NO_CONTENT)
   public void returnBook(@PathVariable String bookId,
                          @RequestBody ReturnBookDto dto) {
       commandGateway.sendAndWait(
               new ReturnBookCommand(bookId, dto.getFullName())
       );
   }

   @GetMapping("/{bookId}")
   public BookView getBook(@PathVariable String bookId) {
       return queryGateway.query(new BookQuery(bookId), BookView.class).join();
   }

   @GetMapping
   public List<BookView> getBookList(@RequestParam(required = false, defaultValue = "") String title) {
       return queryGateway
               .query(new ListBookQuery(title), ResponseTypes.multipleInstancesOf(BookView.class))
               .join();
   }
}

Запускаем приложение как обычное Spring Boot-приложение.

Итоги

В этой статье мы познакомились с неординарным способом создания приложений с использованием паттернов Event Sourcing и CQRS. Данная связка может значительно упростить работу в сложных системах за счет обширной модели предметной области, отслеживания истории состояния, возможности аудита и аналитики из коробки, а также разделения обязанностей между разработчиками при разработке интерфейсов чтения и записи данных.