Пишем своё первое приложение-saver с использованием Akka Actor на Java
Пишем простое приложение на Java, использующее систему акторов Akka для обработки полученных сообщений из очереди и сохранения данных в БД.
5К открытий6К показов
Евгений Афанасьев
Java Developer в компании Luxoft (IT-ONE)
Akka была создана для решения проблем в высоконагруженных системах. Делается это с помощью модели акторов.
Актор сам по себе — это сущность, которая получает некий Message, обрабатывает его атомарно и посылает сообщение следующему актору. Акторы работают асинхронно, не передают контекст приложения между собой и не имеют прямого обращения к экземпляру другого актора. Каждый актор ничего не знает о другом акторе, у него есть только некий адрес. Общение происходит исключительно через отправку Message. Сами Messages должны быть immutable.
Также у актора есть Mailbox – это очередь входящих сообщений. Операция отправки сообщения является неблокирующей. Актор просто отправил сообщение и ничего не ждет в ответ.
В нашем приложении будет иерархия акторов – главный актор-супервизор, который будет создавать и инициализировать дочерние акторы-воркеры. Такая система с супервизором гарантирует, что в случае сбоя в работе супервизора по умолчанию он будет перезапускаться сам и создавать заново все дочерние акторы, начинать с чистого листа. Также у нас будет три дочерних актора:
- Consumer– актор, который будет читать сообщения из Kafka в формате protobuf и посылать эти файлы следующему актору – FileSaver. Сообщения представляют собой файл, который содержит какой-то набор полей, в том числе список неких messages. Messages из файла тоже содержат какой-то набор полей. Все эти данные нам необходимо сохранить в соответствующие таблицы в БД.
- FileSaver – актор, который будет получать сообщения от консюмера, сохранять данные в БД в таблицу file и посылать сообщения с вложенными в файл message следующему актору – MessageSaver.
- MessageSaver – актор, который будет получать сообщения от fileSaver’а и сохранять их в БД в таблицу message.
Первоначальная настройка
Итак, прежде всего подключим в Maven артефакты akka-actor_2.12 и akka-contrib_2.12 – это основной набор инструментов Акка для создания приложения:
Для старта приложения нам потребуется ActorSystem
. Он нужен для того, чтобы создать контекст нашего приложения. Из него будет создан наш первый и главный актор-супервизор.
Создать ActorSystem можно с помощью статичного метода ActorSystem.create()
. В него можно передать имя нашей системы и конфигурацию. Конфигурацию мы будем хранить в ресурсах в файле application.conf
, именно он будет по умолчанию парситься при загрузке контекста. Полный код конфигурации можно посмотреть на GitHub, приведу тут только основные настройки:
Пулы акторов:
Данный кусок конфигурации говорит о том, чтобы при создании акторов с именами fileSaver
и messageSaver
они будут созданы с балансировщиком с политикой round-robin-pool, на котором будет крутиться по 10 инстансов каждого актора.
Диспетчер:
Тут указывается, на каком диспетчере будет выполняться данный актор. Эта настройка не является обязательной. В нашем простом приложении мы не будем её добавлять, а оставим это по умолчанию.
Создание актора-супервизора и дочерних акторов
Мы создали систему. Теперь мы можем попросить систему создать главный актор-супервизор с помощью метода system.actorOf();
. Этот метод ожидает от нас конфигурацию актора (Props) и его имя. Если с именем всё понятно, то про конфигурацию надо сказать отдельно.
Для начала нужно создать класс супервизор, который будет являться актором. Создадим класс ApplicationSupervisor
и наследуемся от AbstractActor.
Для создания конфигурации этого актора нужно описать статичный метод props()
, который будет создавать и возвращать экземпляр конфигурации этого класса. Пример кода ниже.
Этого уже достаточно, чтобы создать актор-супервизор и запустить приложение. Вот пример кода для старта метода main.
После того как мы унаследовали AbstractActor, необходимо переопределить метод createReceive(), который будет обрабатывать входящие сообщения. Сам актор-супервизор не будет получать никаких сообщений. Он будет отвечать только за инициализацию DataSource
, которые необходимы для работы с БД и инициализацию остальных дочерних акторов. Соответственно, createReceive()
в случае получения любого сообщения будет логировать его и ничего не делать дальше.
Теперь нам нужно создать дочерних акторов fileSaver
и messageSaver
. Для этого вызовем метод-фабрику context().actorOf(), который ждет от нас конфигурацию актора и наименование. В данном случае нам уже нужен не один актор, а целый пул. Такую конфигурацию можно создать с помощью метода FromConfig.getInstance().props(). Это означает, что конфигурация пула будет взята из созданного ранее application.conf
, который мы описали выше.
В конце создадим актор Consumer
. Он будет один, поэтому для него конфигурацию пула передавать не надо. Иерархия у нас такая, что Consumer
читает Kafka и отправляет сообщения в fileSaver
. Тот, в свою очередь, сохраняет в БД данные и отправляет сообщения в messageSaver
, который сохраняет в БД данные. Соответственно, создавать эти акторы надо в обратной последовательности, чтобы передавать ссылку в нужной иерархии.
Отправка и обработка сообщений
Теперь, когда мы создали акторов, давайте обратим внимание на работу первого в очереди – это consumer. Переопределенный метод preStart() будет инициализировать подписку на некий кафка-топик и вычитывать первое сообщение из него. Далее он будет отправлять сообщение самому себе.
Чтобы отправить сообщение актору, нам нужна ссылка на него. В данном случае мы отправляем сообщение себе, поэтому ссылку на самого себя возвращает метод self()
. Далее мы вызываем метод tell()
, в который передаём сообщение и ссылку на актор-отправитель.
Для обработки входящих сообщений переопределяем метод createReceive()
и в нём создаём receiveBuilder()
, который будет матчить входящие сообщения на соответствующие методы для их обработки.
В методе processKafkaBatch()
мы запоминаем новый BatchId, он понадобится нам позже. Далее парсим протобаф-файл и создаем список обёрток, которые содержат файл и BatchId. Далее мы запоминаем количество получившихся сообщений, указываем amountDone = 0 (это говорит о том, что количество сообщений об успешном завершении работы вложенных акторов сейчас = 0) и в цикле отправляем по одному файлу следующему актору на обработку fileSaver.tell(file, self())
.
Зачем нам нужна эта часть с BatchId и amountDone? За тем, что мы хотим знать, когда fileSaver
выполнил свою работу и нам можно коммитить и получать из Kafka новый Batch. Для этого fileSaver
после сохранения в БД данных и отправки сообщений дальше в messageSaver
будет отправлять назад консюмеру Done сообщение с текущим BatchId. Метод для обработки входящих Done следующий:
Вот так будет выглядеть обработка входящих в акторе FileSaver:
Здесь мы запоминаем ссылку на sender, она нам понадобится позже. А дальше делаем по аналогии с консюмером. Сохраняем в БД нужные данные и, если вложений больше нет, то говорим сендеру Done сразу, а если вложения есть, то посылаем их по одному актору messageSaver. Когда messageSaver вернёт нам соответствующее количество Done-сообщений, мы тоже говорим сендеру Done.
На этом разработка нашего простого приложения закончена. Мы не обрабатываем ошибочные ситуации в процессе работы актора, например, если произошёл какой-то SQLError, то мы его просто логируем. Но можно, например, пробрасывать Fail-сообщение отправителю вверх, и он уже будет его обрабатывать каким-то специфичным образом.
В результате у нас получилась такая конвейерная система, которую легко масштабировать как вертикально, так и горизонтально. У нас есть устойчивость к сбоям.
Мы можем добавить мониторинг на каждую из стадий, чтобы понимать, где и кто тупит, чтобы потом масштабировать отдельный участок с помощью увеличения пула или разбития на несколько отдельных стадий и т.д. Само по себе создание актора это очень легковесный процесс. Создать или уничтожить его легко, это никак не связано с созданием или уничтожением нитей.
В завершении хотелось бы отметить, что приложение с использованием Akka легко проектировать и разрабатывать. Также у него есть очень удобный фреймворк для тестирования. Так что при выборе решения для разработки высоконагруженного приложения Akka даёт полный набор инструментов для его создания.
5К открытий6К показов