Akka была создана для решения проблем в высоконагруженных системах. Делается это с помощью модели акторов.
Актор сам по себе — это сущность, которая получает некий Message, обрабатывает его атомарно и посылает сообщение следующему актору. Акторы работают асинхронно, не передают контекст приложения между собой и не имеют прямого обращения к экземпляру другого актора. Каждый актор ничего не знает о другом акторе, у него есть только некий адрес. Общение происходит исключительно через отправку Message. Сами Messages должны быть immutable.
Также у актора есть Mailbox – это очередь входящих сообщений. Операция отправки сообщения является неблокирующей. Актор просто отправил сообщение и ничего не ждет в ответ.
В нашем приложении будет иерархия акторов – главный актор-супервизор, который будет создавать и инициализировать дочерние акторы-воркеры. Такая система с супервизором гарантирует, что в случае сбоя в работе супервизора по умолчанию он будет перезапускаться сам и создавать заново все дочерние акторы, начинать с чистого листа. Также у нас будет три дочерних актора:
- Consumer– актор, который будет читать сообщения из Kafka в формате protobuf и посылать эти файлы следующему актору – FileSaver. Сообщения представляют собой файл, который содержит какой-то набор полей, в том числе список неких messages. Messages из файла тоже содержат какой-то набор полей. Все эти данные нам необходимо сохранить в соответствующие таблицы в БД.
- FileSaver – актор, который будет получать сообщения от консюмера, сохранять данные в БД в таблицу file и посылать сообщения с вложенными в файл message следующему актору – MessageSaver.
- MessageSaver – актор, который будет получать сообщения от fileSaver’а и сохранять их в БД в таблицу message.
Первоначальная настройка
Итак, прежде всего подключим в Maven артефакты akka-actor_2.12 и akka-contrib_2.12 – это основной набор инструментов Акка для создания приложения:
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.12</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-contrib_2.12</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-slf4j_2.12</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-java8-compat_2.12</artifactId>
<version>${scala-java8-compat.version}</version>
</dependency>
Для старта приложения нам потребуется ActorSystem
. Он нужен для того, чтобы создать контекст нашего приложения. Из него будет создан наш первый и главный актор-супервизор.
Создать ActorSystem можно с помощью статичного метода ActorSystem.create()
. В него можно передать имя нашей системы и конфигурацию. Конфигурацию мы будем хранить в ресурсах в файле application.conf
, именно он будет по умолчанию парситься при загрузке контекста. Полный код конфигурации можно посмотреть на GitHub, приведу тут только основные настройки:
Пулы акторов:
deployment {
/applicationSupervisor/fileSaver {
router = round-robin-pool
nr-of-instances = 10
}
/applicationSupervisor/messageSaver {
router = round-robin-pool
nr-of-instances = 10
}
}
Данный кусок конфигурации говорит о том, чтобы при создании акторов с именами fileSaver
и messageSaver
они будут созданы с балансировщиком с политикой round-robin-pool, на котором будет крутиться по 10 инстансов каждого актора.
Диспетчер:
dispatcher {
type = PinnedDispatcher
executor = "thread-pool-executor"
}
Тут указывается, на каком диспетчере будет выполняться данный актор. Эта настройка не является обязательной. В нашем простом приложении мы не будем её добавлять, а оставим это по умолчанию.
Создание актора-супервизора и дочерних акторов
Мы создали систему. Теперь мы можем попросить систему создать главный актор-супервизор с помощью метода system.actorOf();
. Этот метод ожидает от нас конфигурацию актора (Props) и его имя. Если с именем всё понятно, то про конфигурацию надо сказать отдельно.
Для начала нужно создать класс супервизор, который будет являться актором. Создадим класс ApplicationSupervisor
и наследуемся от AbstractActor
.
Для создания конфигурации этого актора нужно описать статичный метод props()
, который будет создавать и возвращать экземпляр конфигурации этого класса. Пример кода ниже.
public static Props props() {
return Props.create(ApplicationSupervisor.class, ApplicationSupervisor::new);
}
Этого уже достаточно, чтобы создать актор-супервизор и запустить приложение. Вот пример кода для старта метода main
.
public final class Main {
private static final Logger LOG = LoggerFactory.getLogger(Main.class);
private static final String SERVICE_NAME = "simple-example";
private Main() {}
/**
* Main entry point to application.
*
* @param args cmd args.
*/
public static void main(String[] args) {
LOG.info("Starting application");
final ActorSystem system = ActorSystem.create(
SERVICE_NAME, ConfigFactory.load().getConfig(SERVICE_NAME));
system.actorOf(ApplicationSupervisor.props(), "applicationSupervisor");
}
}
После того как мы унаследовали AbstractActor, необходимо переопределить метод createReceive()
, который будет обрабатывать входящие сообщения. Сам актор-супервизор не будет получать никаких сообщений. Он будет отвечать только за инициализацию DataSource
, которые необходимы для работы с БД и инициализацию остальных дочерних акторов. Соответственно, createReceive()
в случае получения любого сообщения будет логировать его и ничего не делать дальше.
@Override
public Receive createReceive() {
return receiveBuilder()
.matchAny(m -> LOG.warn("Unknown message: {}", m))
.build();
}
@Override
public void postStop() throws Exception {
super.postStop();
PostgreSqlUtils.terminate(this.dataSource);
LOG.info("ApplicationSupervisor stopped");
}
Теперь нам нужно создать дочерних акторов fileSaver
и messageSaver
. Для этого вызовем метод-фабрику context().actorOf()
, который ждет от нас конфигурацию актора и наименование. В данном случае нам уже нужен не один актор, а целый пул. Такую конфигурацию можно создать с помощью метода FromConfig.getInstance().props()
. Это означает, что конфигурация пула будет взята из созданного ранее application.conf
, который мы описали выше.
В конце создадим актор Consumer
. Он будет один, поэтому для него конфигурацию пула передавать не надо. Иерархия у нас такая, что Consumer
читает Kafka и отправляет сообщения в fileSaver
. Тот, в свою очередь, сохраняет в БД данные и отправляет сообщения в messageSaver
, который сохраняет в БД данные. Соответственно, создавать эти акторы надо в обратной последовательности, чтобы передавать ссылку в нужной иерархии.
private void initActors() {
final FileDao fileDao =
new FileDao(this.dataSource);
final MessageDao messageDao =
new MessageDao(this.dataSource);
ActorRef messageSaver = context().actorOf(
FromConfig.getInstance().props(MessageSaver.props(
messageDao
)),
"messageSaver"
);
ActorRef fileSaver = context().actorOf(
FromConfig.getInstance().props(FileSaver.props(
messageSaver,
fileDao
)),
"fileSaver"
);
context().actorOf(Consumer.props(fileSaver), "consumer");
}
Отправка и обработка сообщений
Теперь, когда мы создали акторов, давайте обратим внимание на работу первого в очереди – это consumer. Переопределенный метод preStart()
будет инициализировать подписку на некий кафка-топик и вычитывать первое сообщение из него. Далее он будет отправлять сообщение самому себе.
Чтобы отправить сообщение актору, нам нужна ссылка на него. В данном случае мы отправляем сообщение себе, поэтому ссылку на самого себя возвращает метод self()
. Далее мы вызываем метод tell()
, в который передаём сообщение и ссылку на актор-отправитель.
self().tell(new KafkaBatch(batch), self());
Для обработки входящих сообщений переопределяем метод createReceive()
и в нём создаём receiveBuilder()
, который будет матчить входящие сообщения на соответствующие методы для их обработки.
@Override
public Receive createReceive() {
return receiveBuilder()
.match(KafkaBatch.class, this::processKafkaBatch)
.match(Done.class, m -> handleDone(m.getId()))
.matchAny(m -> LOG.warn("Unknown message: {}", m))
.build();
}
/**
* Process message from kafka.
*
* @param kafkaBatch byte array with message from kafka
*/
private void processKafkaBatch(final KafkaBatch kafkaBatch) {
currentBatchId = UUID.randomUUID().toString();
final List<FileWrapper> records = new LinkedList<>();
for (ConsumerRecord<byte[], byte[]> data : kafkaBatch.getEvents()) {
try {
final ConsumerData.File file =
ConsumerData.File.parseFrom(data.value());
records.add(new FileWrapper(currentBatchId, file));
} catch (InvalidProtocolBufferException e) {
LOG.error("Unknown data in Kafka message: {}", data.value());
}
}
if (records.isEmpty()) {
commitAndAskForNewData();
} else {
amountDone = 0;
currentBatchSize = records.size();
records.forEach(file -> fileSaver.tell(file, self()));
}
}
В методе processKafkaBatch()
мы запоминаем новый BatchId, он понадобится нам позже. Далее парсим протобаф-файл и создаем список обёрток, которые содержат файл и BatchId. Далее мы запоминаем количество получившихся сообщений, указываем amountDone = 0 (это говорит о том, что количество сообщений об успешном завершении работы вложенных акторов сейчас = 0) и в цикле отправляем по одному файлу следующему актору на обработку fileSaver.tell(file, self())
.
Зачем нам нужна эта часть с BatchId и amountDone? За тем, что мы хотим знать, когда fileSaver
выполнил свою работу и нам можно коммитить и получать из Kafka новый Batch. Для этого fileSaver
после сохранения в БД данных и отправки сообщений дальше в messageSaver
будет отправлять назад консюмеру Done сообщение с текущим BatchId. Метод для обработки входящих Done следующий:
public void handleDone(String batchId) {
if (Objects.equals(batchId, currentBatchId)) {
amountDone++;
if (amountDone == currentBatchSize) {
commitAndAskForNewData();
}
}
}
Вот так будет выглядеть обработка входящих в акторе FileSaver:
@Override
public Receive createReceive() {
return receiveBuilder()
.match(FileWrapper.class, this::processfile)
.match(Done.class, m -> handleDone(m.getId()))
.matchAny(m -> LOG.warn("Unknown message: {}", m))
.build();
}
private void processfile(final FileWrapper fileWrapper) {
sender = sender();
receivedBatchId = fileWrapper.getBatchId();
currentBatchId = UUID.randomUUID().toString();
long fileId = 0;
ConsumerData.File file = fileWrapper.getFile();
final List<ConsumerData.Message> messagesList = file.getMessagesList();
try {
fileId = fileDao.saveFile(file);
} catch (SQLException e) {
LOG.error(e.getMessage());
}
if (messagesList.isEmpty()) {
sender().tell(new Done(fileWrapper.getBatchId()), self());
} else {
amountDone = 0;
currentBatchSize = messagesList.size();
long finalFileId = fileId;
messagesList.forEach(message ->
messageSaver.tell(new MessageWrapper(
currentBatchId,
finalFileId,
message
), self())
);
}
}
Здесь мы запоминаем ссылку на sender
, она нам понадобится позже. А дальше делаем по аналогии с консюмером. Сохраняем в БД нужные данные и, если вложений больше нет, то говорим сендеру Done сразу, а если вложения есть, то посылаем их по одному актору messageSaver. Когда messageSaver вернёт нам соответствующее количество Done-сообщений, мы тоже говорим сендеру Done.
public void handleDone(String batchId) {
if (Objects.equals(batchId, currentBatchId)) {
amountDone++;
if (amountDone == currentBatchSize) {
sender.tell(new Done(receivedBatchId), self());
}
}
}
На этом разработка нашего простого приложения закончена. Мы не обрабатываем ошибочные ситуации в процессе работы актора, например, если произошёл какой-то SQLError
, то мы его просто логируем. Но можно, например, пробрасывать Fail-сообщение отправителю вверх, и он уже будет его обрабатывать каким-то специфичным образом.
В результате у нас получилась такая конвейерная система, которую легко масштабировать как вертикально, так и горизонтально. У нас есть устойчивость к сбоям.
Мы можем добавить мониторинг на каждую из стадий, чтобы понимать, где и кто тупит, чтобы потом масштабировать отдельный участок с помощью увеличения пула или разбития на несколько отдельных стадий и т.д. Само по себе создание актора это очень легковесный процесс. Создать или уничтожить его легко, это никак не связано с созданием или уничтожением нитей.
В завершении хотелось бы отметить, что приложение с использованием Akka легко проектировать и разрабатывать. Также у него есть очень удобный фреймворк для тестирования. Так что при выборе решения для разработки высоконагруженного приложения Akka даёт полный набор инструментов для его создания.