Написать пост

Пишем своё первое приложение-saver с использованием Akka Actor на Java

Аватарка пользователя EVG-A

Пишем простое приложение на Java, использующее систему акторов Akka для обработки полученных сообщений из очереди и сохранения данных в БД.

Akka была создана для решения проблем в высоконагруженных системах. Делается это с помощью модели акторов.

Актор сам по себе — это сущность, которая получает некий Message, обрабатывает его атомарно и посылает сообщение следующему актору. Акторы работают асинхронно, не передают контекст приложения между собой и не имеют прямого обращения к экземпляру другого актора. Каждый актор ничего не знает о другом акторе, у него есть только некий адрес. Общение происходит исключительно через отправку Message. Сами Messages должны быть immutable.

Также у актора есть Mailbox – это очередь входящих сообщений. Операция отправки сообщения является неблокирующей. Актор просто отправил сообщение и ничего не ждет в ответ.

В нашем приложении будет иерархия акторов – главный актор-супервизор, который будет создавать и инициализировать дочерние акторы-воркеры. Такая система с супервизором гарантирует, что в случае сбоя в работе супервизора по умолчанию он будет перезапускаться сам и создавать заново все дочерние акторы, начинать с чистого листа. Также у нас будет три дочерних актора:

  1. Consumer– актор, который будет читать сообщения из Kafka в формате protobuf и посылать эти файлы следующему актору – FileSaver. Сообщения представляют собой файл, который содержит какой-то набор полей, в том числе список неких messages. Messages из файла тоже содержат какой-то набор полей. Все эти данные нам необходимо сохранить в соответствующие таблицы в БД.
  2. FileSaver – актор, который будет получать сообщения от консюмера, сохранять данные в БД в таблицу file и посылать сообщения с вложенными в файл message следующему актору – MessageSaver.
  3. MessageSaver – актор, который будет получать сообщения от fileSaver’а и сохранять их в БД в таблицу message.

Первоначальная настройка

Итак, прежде всего подключим в Maven артефакты akka-actor_2.12 и akka-contrib_2.12  – это основной набор инструментов Акка для создания приложения:

			<dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-actor_2.12</artifactId>
            <version>${akka.version}</version>
        </dependency>
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-contrib_2.12</artifactId>
            <version>${akka.version}</version>
        </dependency>
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-slf4j_2.12</artifactId>
            <version>${akka.version}</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang.modules</groupId>
            <artifactId>scala-java8-compat_2.12</artifactId>
            <version>${scala-java8-compat.version}</version>
       </dependency>
		

Для старта приложения нам потребуется ActorSystem. Он нужен для того, чтобы создать контекст нашего приложения. Из него будет создан наш первый и главный актор-супервизор.

Создать ActorSystem можно с помощью статичного метода ActorSystem.create(). В него можно передать имя нашей системы и конфигурацию. Конфигурацию мы будем хранить в ресурсах в файле application.conf, именно он будет по умолчанию парситься при загрузке контекста. Полный код конфигурации можно посмотреть на GitHub, приведу тут только основные настройки:

Пулы акторов:

			deployment {
        /applicationSupervisor/fileSaver {
          router = round-robin-pool
          nr-of-instances = 10
        }
        /applicationSupervisor/messageSaver {
          router = round-robin-pool
          nr-of-instances = 10
        }
      }
		

Данный кусок конфигурации говорит о том, чтобы при создании акторов с именами fileSaver и messageSaver они будут созданы с балансировщиком с политикой round-robin-pool, на котором будет крутиться по 10 инстансов каждого актора.

Диспетчер:

			dispatcher {
    type = PinnedDispatcher
    executor = "thread-pool-executor"
  }
		

Тут указывается, на каком диспетчере будет выполняться данный актор. Эта настройка не является обязательной. В нашем простом приложении мы не будем её добавлять, а оставим это по умолчанию.

Создание актора-супервизора и дочерних акторов

Мы создали систему. Теперь мы можем попросить систему создать главный актор-супервизор с помощью метода system.actorOf();. Этот метод ожидает от нас конфигурацию актора (Props) и его имя. Если с именем всё понятно, то про конфигурацию надо сказать отдельно.

Для начала нужно создать класс супервизор, который будет являться актором. Создадим класс ApplicationSupervisor и наследуемся от AbstractActor.
Для создания конфигурации этого актора нужно описать статичный метод props(), который будет создавать и возвращать экземпляр конфигурации этого класса. Пример кода ниже.

			public static Props props() {
        return Props.create(ApplicationSupervisor.class, ApplicationSupervisor::new);
    }
		

Этого уже достаточно, чтобы создать актор-супервизор и запустить приложение. Вот пример кода для старта метода main.

			public final class Main {

    private static final Logger LOG = LoggerFactory.getLogger(Main.class);
    private static final String SERVICE_NAME = "simple-example";

    private Main() {}

    /**
     * Main entry point to application.
     *
     * @param args cmd args.
     */
    public static void main(String[] args) {
        LOG.info("Starting application");
        final ActorSystem system = ActorSystem.create(
            SERVICE_NAME, ConfigFactory.load().getConfig(SERVICE_NAME));
        system.actorOf(ApplicationSupervisor.props(), "applicationSupervisor");
    }
}
		

После того как мы унаследовали AbstractActor, необходимо переопределить метод createReceive(), который будет обрабатывать входящие сообщения. Сам актор-супервизор не будет получать никаких сообщений. Он будет отвечать только за инициализацию DataSource, которые необходимы для работы с БД и инициализацию остальных дочерних акторов. Соответственно, createReceive() в случае получения любого сообщения будет логировать его и ничего не делать дальше.

			@Override
    public Receive createReceive() {
        return receiveBuilder()
            .matchAny(m -> LOG.warn("Unknown message: {}", m))
            .build();
    }
    
    @Override
    public void postStop() throws Exception {
        super.postStop();
        PostgreSqlUtils.terminate(this.dataSource);
        LOG.info("ApplicationSupervisor stopped");
    }
		

Теперь нам нужно создать дочерних акторов fileSaver и messageSaver. Для этого вызовем метод-фабрику context().actorOf(), который ждет от нас конфигурацию актора и наименование. В данном случае нам уже нужен не один актор, а целый пул. Такую конфигурацию можно создать с помощью метода FromConfig.getInstance().props(). Это означает, что конфигурация пула будет взята из созданного ранее application.conf, который мы описали выше.

В конце создадим актор Consumer. Он будет один, поэтому для него конфигурацию пула передавать не надо. Иерархия у нас такая, что Consumer читает Kafka и отправляет сообщения в fileSaver. Тот, в свою очередь, сохраняет в БД данные и отправляет сообщения в messageSaver, который сохраняет в БД данные. Соответственно, создавать эти акторы надо в обратной последовательности, чтобы передавать ссылку в нужной иерархии.

			private void initActors() {
        final FileDao fileDao =
            new FileDao(this.dataSource);
        final MessageDao messageDao =
            new MessageDao(this.dataSource);

        ActorRef messageSaver = context().actorOf(
            FromConfig.getInstance().props(MessageSaver.props(
                messageDao
            )),
            "messageSaver"
        );
        ActorRef fileSaver = context().actorOf(
            FromConfig.getInstance().props(FileSaver.props(
                messageSaver,
                fileDao
            )),
            "fileSaver"
        );

        context().actorOf(Consumer.props(fileSaver), "consumer");
    }
		

Отправка и обработка сообщений

Теперь, когда мы создали акторов, давайте обратим внимание на работу первого в очереди – это consumer. Переопределенный метод preStart() будет инициализировать подписку на некий кафка-топик и вычитывать первое сообщение из него. Далее он будет отправлять сообщение самому себе.

Чтобы отправить сообщение актору, нам нужна ссылка на него. В данном случае мы отправляем сообщение себе, поэтому ссылку на самого себя возвращает метод self(). Далее мы вызываем метод tell(), в который передаём сообщение и ссылку на актор-отправитель.

			self().tell(new KafkaBatch(batch), self());
		

Для обработки входящих сообщений переопределяем метод createReceive() и в нём создаём receiveBuilder(), который будет матчить входящие сообщения на соответствующие методы для их обработки.

			@Override
    public Receive createReceive() {
        return receiveBuilder()
            .match(KafkaBatch.class, this::processKafkaBatch)
            .match(Done.class, m -> handleDone(m.getId()))
            .matchAny(m -> LOG.warn("Unknown message: {}", m))
            .build();
    } 

     /**
     * Process message from kafka.
     *
     * @param kafkaBatch byte array with message from kafka
     */
    private void processKafkaBatch(final KafkaBatch kafkaBatch) {
        currentBatchId = UUID.randomUUID().toString();
        final List<FileWrapper> records = new LinkedList<>();
        for (ConsumerRecord<byte[], byte[]> data : kafkaBatch.getEvents()) {
            try {
                final ConsumerData.File file =
                    ConsumerData.File.parseFrom(data.value());
                records.add(new FileWrapper(currentBatchId, file));
            } catch (InvalidProtocolBufferException e) {
                LOG.error("Unknown data in Kafka message: {}", data.value());
            }
        }
        if (records.isEmpty()) {
            commitAndAskForNewData();
        } else {
            amountDone = 0;
            currentBatchSize = records.size();
            records.forEach(file -> fileSaver.tell(file, self()));
        }
    }
		

В методе processKafkaBatch() мы запоминаем новый BatchId, он понадобится нам позже. Далее парсим протобаф-файл и создаем список обёрток, которые содержат файл и BatchId. Далее мы запоминаем количество получившихся сообщений, указываем amountDone = 0 (это говорит о том, что количество сообщений об успешном завершении работы вложенных акторов сейчас = 0) и в цикле отправляем по одному файлу следующему актору на обработку fileSaver.tell(file, self()).

Зачем нам нужна эта часть с BatchId и amountDone? За тем, что мы хотим знать, когда fileSaver выполнил свою работу и нам можно коммитить и получать из Kafka новый Batch. Для этого fileSaver после сохранения в БД данных и отправки сообщений дальше в messageSaver будет отправлять назад консюмеру Done сообщение с текущим BatchId. Метод для обработки входящих Done следующий:

			public void handleDone(String batchId) {
        if (Objects.equals(batchId, currentBatchId)) {
            amountDone++;
            if (amountDone == currentBatchSize) {
                commitAndAskForNewData();
            }
        }
    }
		

Вот так будет выглядеть обработка входящих в акторе FileSaver:

			@Override
    public Receive createReceive() {
        return receiveBuilder()
            .match(FileWrapper.class, this::processfile)
            .match(Done.class, m -> handleDone(m.getId()))
            .matchAny(m -> LOG.warn("Unknown message: {}", m))
            .build();
    }

    private void processfile(final FileWrapper fileWrapper) {
        sender = sender();
        receivedBatchId = fileWrapper.getBatchId();
        currentBatchId = UUID.randomUUID().toString();
        long fileId = 0;
        ConsumerData.File file = fileWrapper.getFile();
        final List<ConsumerData.Message> messagesList = file.getMessagesList();
        try {
            fileId = fileDao.saveFile(file);
        } catch (SQLException e) {
            LOG.error(e.getMessage());
        }
        if (messagesList.isEmpty()) {
            sender().tell(new Done(fileWrapper.getBatchId()), self());
        } else {
            amountDone = 0;
            currentBatchSize = messagesList.size();
            long finalFileId = fileId;
            messagesList.forEach(message ->
                messageSaver.tell(new MessageWrapper(
                    currentBatchId,
                    finalFileId,
                    message
                ), self())
            );
        }
    }
		

Здесь мы запоминаем ссылку на sender, она нам понадобится позже. А дальше делаем по аналогии с консюмером. Сохраняем в БД нужные данные и, если вложений больше нет, то говорим сендеру Done сразу, а если вложения есть, то посылаем их по одному актору messageSaver. Когда messageSaver вернёт нам соответствующее количество Done-сообщений, мы тоже говорим сендеру Done.

			public void handleDone(String batchId) {
        if (Objects.equals(batchId, currentBatchId)) {
            amountDone++;
            if (amountDone == currentBatchSize) {
                sender.tell(new Done(receivedBatchId), self());
            }
        }
    }
		

На этом разработка нашего простого приложения закончена. Мы не обрабатываем ошибочные ситуации в процессе работы актора, например, если произошёл какой-то SQLError, то мы его просто логируем. Но можно, например, пробрасывать Fail-сообщение отправителю вверх, и он уже будет его обрабатывать каким-то специфичным образом.

В результате у нас получилась такая конвейерная система, которую легко масштабировать как вертикально, так и горизонтально. У нас есть устойчивость к сбоям.

Мы можем добавить мониторинг на каждую из стадий, чтобы понимать, где и кто тупит, чтобы потом масштабировать отдельный участок с помощью увеличения пула или разбития на несколько отдельных стадий и т.д. Само по себе создание актора это очень легковесный процесс. Создать или уничтожить его легко, это никак не связано с созданием или уничтожением нитей.

В завершении хотелось бы отметить, что приложение с использованием Akka легко проектировать и разрабатывать. Также у него есть очень удобный фреймворк для тестирования. Так что при выборе решения для разработки высоконагруженного приложения Akka даёт полный набор инструментов для его создания.

Следите за новыми постами
Следите за новыми постами по любимым темам
5К открытий5К показов