Аватарка пользователя EVG-A
EVG-A

Пишем своё первое приложение-saver с использованием Akka Actor на Java

Пишем простое приложение на Java, использующее систему акторов Akka для обработки полученных сообщений из очереди и сохранения данных в БД.

4454

Akka была создана для решения проблем в высоконагруженных системах. Делается это с помощью модели акторов.

Актор сам по себе — это сущность, которая получает некий Message, обрабатывает его атомарно и посылает сообщение следующему актору. Акторы работают асинхронно, не передают контекст приложения между собой и не имеют прямого обращения к экземпляру другого актора. Каждый актор ничего не знает о другом акторе, у него есть только некий адрес. Общение происходит исключительно через отправку Message. Сами Messages должны быть immutable.

Также у актора есть Mailbox – это очередь входящих сообщений. Операция отправки сообщения является неблокирующей. Актор просто отправил сообщение и ничего не ждет в ответ.

В нашем приложении будет иерархия акторов – главный актор-супервизор, который будет создавать и инициализировать дочерние акторы-воркеры. Такая система с супервизором гарантирует, что в случае сбоя в работе супервизора по умолчанию он будет перезапускаться сам и создавать заново все дочерние акторы, начинать с чистого листа. Также у нас будет три дочерних актора:

  1. Consumer– актор, который будет читать сообщения из Kafka в формате protobuf и посылать эти файлы следующему актору – FileSaver. Сообщения представляют собой файл, который содержит какой-то набор полей, в том числе список неких messages. Messages из файла тоже содержат какой-то набор полей. Все эти данные нам необходимо сохранить в соответствующие таблицы в БД.
  2. FileSaver – актор, который будет получать сообщения от консюмера, сохранять данные в БД в таблицу file и посылать сообщения с вложенными в файл message следующему актору – MessageSaver.
  3. MessageSaver – актор, который будет получать сообщения от fileSaver’а и сохранять их в БД в таблицу message.

Первоначальная настройка

Итак, прежде всего подключим в Maven артефакты akka-actor_2.12 и akka-contrib_2.12  – это основной набор инструментов Акка для создания приложения:

			<dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-actor_2.12</artifactId>
            <version>${akka.version}</version>
        </dependency>
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-contrib_2.12</artifactId>
            <version>${akka.version}</version>
        </dependency>
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-slf4j_2.12</artifactId>
            <version>${akka.version}</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang.modules</groupId>
            <artifactId>scala-java8-compat_2.12</artifactId>
            <version>${scala-java8-compat.version}</version>
       </dependency>
		

Для старта приложения нам потребуется ActorSystem. Он нужен для того, чтобы создать контекст нашего приложения. Из него будет создан наш первый и главный актор-супервизор.

Создать ActorSystem можно с помощью статичного метода ActorSystem.create(). В него можно передать имя нашей системы и конфигурацию. Конфигурацию мы будем хранить в ресурсах в файле application.conf, именно он будет по умолчанию парситься при загрузке контекста. Полный код конфигурации можно посмотреть на GitHub, приведу тут только основные настройки:

Пулы акторов:

			deployment {
        /applicationSupervisor/fileSaver {
          router = round-robin-pool
          nr-of-instances = 10
        }
        /applicationSupervisor/messageSaver {
          router = round-robin-pool
          nr-of-instances = 10
        }
      }
		

Данный кусок конфигурации говорит о том, чтобы при создании акторов с именами fileSaver и messageSaver они будут созданы с балансировщиком с политикой round-robin-pool, на котором будет крутиться по 10 инстансов каждого актора.

Диспетчер:

			dispatcher {
    type = PinnedDispatcher
    executor = "thread-pool-executor"
  }
		

Тут указывается, на каком диспетчере будет выполняться данный актор. Эта настройка не является обязательной. В нашем простом приложении мы не будем её добавлять, а оставим это по умолчанию.

Создание актора-супервизора и дочерних акторов

Мы создали систему. Теперь мы можем попросить систему создать главный актор-супервизор с помощью метода system.actorOf();. Этот метод ожидает от нас конфигурацию актора (Props) и его имя. Если с именем всё понятно, то про конфигурацию надо сказать отдельно.

Для начала нужно создать класс супервизор, который будет являться актором. Создадим класс ApplicationSupervisor и наследуемся от AbstractActor.
Для создания конфигурации этого актора нужно описать статичный метод props(), который будет создавать и возвращать экземпляр конфигурации этого класса. Пример кода ниже.

			public static Props props() {
        return Props.create(ApplicationSupervisor.class, ApplicationSupervisor::new);
    }
		

Этого уже достаточно, чтобы создать актор-супервизор и запустить приложение. Вот пример кода для старта метода main.

			public final class Main {

    private static final Logger LOG = LoggerFactory.getLogger(Main.class);
    private static final String SERVICE_NAME = "simple-example";

    private Main() {}

    /**
     * Main entry point to application.
     *
     * @param args cmd args.
     */
    public static void main(String[] args) {
        LOG.info("Starting application");
        final ActorSystem system = ActorSystem.create(
            SERVICE_NAME, ConfigFactory.load().getConfig(SERVICE_NAME));
        system.actorOf(ApplicationSupervisor.props(), "applicationSupervisor");
    }
}
		

После того как мы унаследовали AbstractActor, необходимо переопределить метод createReceive(), который будет обрабатывать входящие сообщения. Сам актор-супервизор не будет получать никаких сообщений. Он будет отвечать только за инициализацию DataSource, которые необходимы для работы с БД и инициализацию остальных дочерних акторов. Соответственно, createReceive() в случае получения любого сообщения будет логировать его и ничего не делать дальше.

			@Override
    public Receive createReceive() {
        return receiveBuilder()
            .matchAny(m -> LOG.warn("Unknown message: {}", m))
            .build();
    }
    
    @Override
    public void postStop() throws Exception {
        super.postStop();
        PostgreSqlUtils.terminate(this.dataSource);
        LOG.info("ApplicationSupervisor stopped");
    }
		

Теперь нам нужно создать дочерних акторов fileSaver и messageSaver. Для этого вызовем метод-фабрику context().actorOf(), который ждет от нас конфигурацию актора и наименование. В данном случае нам уже нужен не один актор, а целый пул. Такую конфигурацию можно создать с помощью метода FromConfig.getInstance().props(). Это означает, что конфигурация пула будет взята из созданного ранее application.conf, который мы описали выше.

В конце создадим актор Consumer. Он будет один, поэтому для него конфигурацию пула передавать не надо. Иерархия у нас такая, что Consumer читает Kafka и отправляет сообщения в fileSaver. Тот, в свою очередь, сохраняет в БД данные и отправляет сообщения в messageSaver, который сохраняет в БД данные. Соответственно, создавать эти акторы надо в обратной последовательности, чтобы передавать ссылку в нужной иерархии.

			private void initActors() {
        final FileDao fileDao =
            new FileDao(this.dataSource);
        final MessageDao messageDao =
            new MessageDao(this.dataSource);

        ActorRef messageSaver = context().actorOf(
            FromConfig.getInstance().props(MessageSaver.props(
                messageDao
            )),
            "messageSaver"
        );
        ActorRef fileSaver = context().actorOf(
            FromConfig.getInstance().props(FileSaver.props(
                messageSaver,
                fileDao
            )),
            "fileSaver"
        );

        context().actorOf(Consumer.props(fileSaver), "consumer");
    }
		

Отправка и обработка сообщений

Теперь, когда мы создали акторов, давайте обратим внимание на работу первого в очереди – это consumer. Переопределенный метод preStart() будет инициализировать подписку на некий кафка-топик и вычитывать первое сообщение из него. Далее он будет отправлять сообщение самому себе.

Чтобы отправить сообщение актору, нам нужна ссылка на него. В данном случае мы отправляем сообщение себе, поэтому ссылку на самого себя возвращает метод self(). Далее мы вызываем метод tell(), в который передаём сообщение и ссылку на актор-отправитель.

			self().tell(new KafkaBatch(batch), self());
		

Для обработки входящих сообщений переопределяем метод createReceive() и в нём создаём receiveBuilder(), который будет матчить входящие сообщения на соответствующие методы для их обработки.

			@Override
    public Receive createReceive() {
        return receiveBuilder()
            .match(KafkaBatch.class, this::processKafkaBatch)
            .match(Done.class, m -> handleDone(m.getId()))
            .matchAny(m -> LOG.warn("Unknown message: {}", m))
            .build();
    } 

     /**
     * Process message from kafka.
     *
     * @param kafkaBatch byte array with message from kafka
     */
    private void processKafkaBatch(final KafkaBatch kafkaBatch) {
        currentBatchId = UUID.randomUUID().toString();
        final List<FileWrapper> records = new LinkedList<>();
        for (ConsumerRecord<byte[], byte[]> data : kafkaBatch.getEvents()) {
            try {
                final ConsumerData.File file =
                    ConsumerData.File.parseFrom(data.value());
                records.add(new FileWrapper(currentBatchId, file));
            } catch (InvalidProtocolBufferException e) {
                LOG.error("Unknown data in Kafka message: {}", data.value());
            }
        }
        if (records.isEmpty()) {
            commitAndAskForNewData();
        } else {
            amountDone = 0;
            currentBatchSize = records.size();
            records.forEach(file -> fileSaver.tell(file, self()));
        }
    }
		

В методе processKafkaBatch() мы запоминаем новый BatchId, он понадобится нам позже. Далее парсим протобаф-файл и создаем список обёрток, которые содержат файл и BatchId. Далее мы запоминаем количество получившихся сообщений, указываем amountDone = 0 (это говорит о том, что количество сообщений об успешном завершении работы вложенных акторов сейчас = 0) и в цикле отправляем по одному файлу следующему актору на обработку fileSaver.tell(file, self()).

Зачем нам нужна эта часть с BatchId и amountDone? За тем, что мы хотим знать, когда fileSaver выполнил свою работу и нам можно коммитить и получать из Kafka новый Batch. Для этого fileSaver после сохранения в БД данных и отправки сообщений дальше в messageSaver будет отправлять назад консюмеру Done сообщение с текущим BatchId. Метод для обработки входящих Done следующий:

			public void handleDone(String batchId) {
        if (Objects.equals(batchId, currentBatchId)) {
            amountDone++;
            if (amountDone == currentBatchSize) {
                commitAndAskForNewData();
            }
        }
    }
		

Вот так будет выглядеть обработка входящих в акторе FileSaver:

			@Override
    public Receive createReceive() {
        return receiveBuilder()
            .match(FileWrapper.class, this::processfile)
            .match(Done.class, m -> handleDone(m.getId()))
            .matchAny(m -> LOG.warn("Unknown message: {}", m))
            .build();
    }

    private void processfile(final FileWrapper fileWrapper) {
        sender = sender();
        receivedBatchId = fileWrapper.getBatchId();
        currentBatchId = UUID.randomUUID().toString();
        long fileId = 0;
        ConsumerData.File file = fileWrapper.getFile();
        final List<ConsumerData.Message> messagesList = file.getMessagesList();
        try {
            fileId = fileDao.saveFile(file);
        } catch (SQLException e) {
            LOG.error(e.getMessage());
        }
        if (messagesList.isEmpty()) {
            sender().tell(new Done(fileWrapper.getBatchId()), self());
        } else {
            amountDone = 0;
            currentBatchSize = messagesList.size();
            long finalFileId = fileId;
            messagesList.forEach(message ->
                messageSaver.tell(new MessageWrapper(
                    currentBatchId,
                    finalFileId,
                    message
                ), self())
            );
        }
    }
		

Здесь мы запоминаем ссылку на sender, она нам понадобится позже. А дальше делаем по аналогии с консюмером. Сохраняем в БД нужные данные и, если вложений больше нет, то говорим сендеру Done сразу, а если вложения есть, то посылаем их по одному актору messageSaver. Когда messageSaver вернёт нам соответствующее количество Done-сообщений, мы тоже говорим сендеру Done.

			public void handleDone(String batchId) {
        if (Objects.equals(batchId, currentBatchId)) {
            amountDone++;
            if (amountDone == currentBatchSize) {
                sender.tell(new Done(receivedBatchId), self());
            }
        }
    }
		

На этом разработка нашего простого приложения закончена. Мы не обрабатываем ошибочные ситуации в процессе работы актора, например, если произошёл какой-то SQLError, то мы его просто логируем. Но можно, например, пробрасывать Fail-сообщение отправителю вверх, и он уже будет его обрабатывать каким-то специфичным образом.

В результате у нас получилась такая конвейерная система, которую легко масштабировать как вертикально, так и горизонтально. У нас есть устойчивость к сбоям.

Мы можем добавить мониторинг на каждую из стадий, чтобы понимать, где и кто тупит, чтобы потом масштабировать отдельный участок с помощью увеличения пула или разбития на несколько отдельных стадий и т.д. Само по себе создание актора это очень легковесный процесс. Создать или уничтожить его легко, это никак не связано с созданием или уничтожением нитей.

В завершении хотелось бы отметить, что приложение с использованием Akka легко проектировать и разрабатывать. Также у него есть очень удобный фреймворк для тестирования. Так что при выборе решения для разработки высоконагруженного приложения Akka даёт полный набор инструментов для его создания.

4454