Написать пост

Особенности семантики exactly-once при разработке для Kafka на Python

Аватарка пользователя Эмиль Гатауллин

Рассказываем об особенностях семантики exactly-once на примере разработки проекта на Kafka и Python для сортировки данных.

Один из моих проектов — сервис для технологической платформы НЛМК — берет данные из топиков одного кластера Kafka, обрабатывает их, сортирует по определенному признаку и записывает в соответствующие этому признаку топики другого кластера. Оттуда они потом попадают в хранилище или используются разными сервисами. Причем сообщения читаются и обрабатываются не по одному, а сразу большой пачкой, по несколько тысяч за раз.

Одно из основных требований, предъявляемых к сервису: данные должны быть прочитаны из топиков-источников, обработаны и гарантированно записаны в результирующие топики. Какие-либо потери крайне нежелательны. Поэтому необходимо было выбрать сценарий обработки сообщений, подходящий для этих требований.

О том, как именно я решил проблему, и поговорим.

Какие семантики существуют для подобных задач?

  1. exactly-once — подход, при котором сообщение доставляется получателю строго один раз, без дублирования и потери данных.
  2. at-most-once — сообщение будет доставлено получателю не более одного раза, но может не быть доставлено вовсе.
  3. at-least-once — сообщение будет доставлено как минимум один раз, но возможно дублирование данных в результате повторной отправки.

На первый взгляд здесь хотелось бы использовать семантику exactly-once как наиболее выгодную. Однако далее я расскажу, с какими сложностями столкнулся в процессе ее реализации и почему в конце концов пришлось от нее отказаться. Примеры будут приведены с использованием Python библиотеки aiokafka, т. к. я использовал именно ее при разработке данного сервиса.

Поддержка exactly-once в Kafka

Поговорим о том, какие особенности Kafka поддерживают использование семантики exactly-once. До обновления 0.11, вышедшего в 2017 году, Kafka поддерживала семантику at-least-once, но с выходом этой версии появилось несколько механизмов, помогающих в реализации exactly-once в плане доставки сообщений. Далее об этих механизмах подробнее.

Режим идемпотентности в производителе (producer)

В общем случае идемпотентность — свойство объекта или операции при повторном применении операции к объекту давать тот же результат, что и при первом.

Данный режим включается с помощью параметра enable_idempotence = True. При его включении в производителе все операции отправки сообщений становятся идемпотентными, т. е. в случае ошибки, которая вызывает повторную отправку сообщения производителем (например, при сбое в соединении), сообщение будет записано только один раз, независимо от того, сколько попыток отправки было предпринято.

Режим транзакционности при отправке сообщений

Kafka теперь поддерживает транзакции при отправке пачки (batch) сообщений в несколько разделов (partition) одного топика, следуя принципу «все или ничего». Отправленные в топик сообщения из пачки будут видны потребителю (consumer) только при успешной отправке всех сообщений в транзакции. А в случае ошибки транзакция считается отмененной и потребители не получат тех сообщений, которые были частично отправлены.

Но для правильной работы транзакций необходимо назначить на каждом производителе свой уникальный transactional_id, который обеспечит непрерывность транзакционности между перезапусками сервиса. Также в настройках самой Kafka replication.factor должен быть равен 3 или более.

Особенности семантики exactly-once при разработке для Kafka на Python 1

Кроме этого, в потребителе должен быть выставлен параметр isolation_level = «read_commited». Он означает, что потребитель будет игнорировать сообщения из отмененных транзакций.

Особенности семантики exactly-once при разработке для Kafka на Python 2

Пример использования транзакции:

Особенности семантики exactly-once при разработке для Kafka на Python 3

Отдельно хочу отметить, что перечисленные механизмы касаются только отправки и получения сообщений, но никак не затрагивают их обработку. То есть они обеспечивают exactly-once при доставке сообщений, но не более того, и для решения задач создаваемого мной сервиса этого было недостаточно.

Сообщение могло быть успешно получено, но не было бы никаких гарантий, что в дальнейшем оно будет успешно обработано и успешно отправлено. В общем случае Kafka является лишь способом передачи данных, но никак не контролирует, что и как в нее записывается, реализуя концепцию «тупой сервер, умный клиент» — она подразумевает, что вся логика работы с сообщениями находится исключительно на клиентской стороне.

Мне хотелось построить логику сервиса таким способом, чтобы весь цикл, состоящий из получения, обработки и отправки, был в семантике exactly-once.

Особенности реализации сервиса

Забегая вперед, стоит сказать, что если бы создаваемый сервис читал из одного топика и писал тоже в один топик, особых сложностей с данной семантикой не возникло бы. Для написания клиента Kafka я использовал Python библиотеку aiokafka, в которой даже есть встроенная поддержка паттерна «Transactional Consume-Process-Produce».

Этот паттерн подразумевает, что отправка смещения в потребителе происходит после успешной транзакционной записи в производителе. В случае ошибки транзакция производителя откатывается (сообщения оказываются недоступны потребителям с isolation_level=«read_commited»), а смещение в потребителе не фиксируется.

Таким образом, неудачный цикл получения-обработки-отправки не производит никаких изменений и может быть безопасно повторен. Это хорошо работает до тех пор, пока у нас есть только один производитель.

В разрабатываемом сервисе данные после обработки сортировались по определенному признаку и отправлялись в соответствии с ним в разные топики разными производителями. Также нужно отметить, что принцип обработки сообщений в сервисе подразумевал, что сообщения получаются большими пачками, тысячами за раз.

Получение сообщений поочередно по одному нарушало бы логику сервиса, не говоря о значительном падении эффективности. Поэтому взять одно сообщение, обработать и отправить его одним производителем, после чего зафиксировать смещение в потребителе не представлялось возможным.

Тем не менее я постарался сделать реализацию, максимально близкую к «Transactional Consume-Process-Produce». Так, для потребителя был включен параметр auto_commit = False, это означает, что при получении сообщений смещение фиксируется вручную, а не автоматически. И была сделана следующая логика обработки.

  1. Потребитель читает пачку сообщений из топика, но не фиксирует смещение.
  2. Происходит процесс обработки пачки.
  3. Происходит сортировка результатов обработки и подготовка к отправке в разные топики.
  4. Производитель каждого топика отправляет свои данные в соответствующий топик и сообщает об успехе/неудаче.
  5. В случае успешной отправки всех сообщений всех производителей происходит фиксация смещения в потребителе.

Пример «ручной» фиксации смещения:

Особенности семантики exactly-once при разработке для Kafka на Python 4

Довольно очевидно, с какой проблемой здесь можно столкнуться. Допустим, пачка сообщений была успешно получена, обработана и частично отправлена в топики-получатели, но после этого происходит ошибка или неудачная отправка в одном из производителей.

Смещение в потребителе не будет зафиксировано, и далее эта же пачка сообщений будет им прочитана заново, затем снова обработана и отправлена, что приведет к дублированию данных. Транзакционность производителя здесь не может полностью решить проблему, т. к. транзакции работают в рамках одного производителя, а в нашем случае их несколько.

Да, для каждого производителя сообщения либо будут отправлены все, либо не отправлены вовсе, но это дает лишь частичный выигрыш.

После анализа сложившейся ситуации мной было принято решение отказаться от семантики exactly-once в цикле получения-обработки-отправки и остаться в рамках семантики at-least-once. Связано это с тем, что сервисы, которые далее выступали потребителями сообщений, корректно обрабатывали возможные дубликаты сообщений.

Гораздо критичнее было не получить какие-то сообщения вовсе, чем получить их несколько раз. Кроме того, очень важна производительность сервиса и максимальная его простота — без подключения дополнительных механизмов сохранения информации об отправленных сообщениях с целью добиться exactly-once во всем цикле обработки.

Каждый экземпляр сервиса должен был быть способен обрабатывать несколько миллионов сообщений в час.

При создании сервисов работы с Kafka семантика exactly-once может казаться наиболее выгодной, но при этом быть сложно реализуемой, особенно в случае, когда существует сразу несколько производителей или в обработке сообщений участвуют сторонние сервисы/компоненты, сохраняющие состояние, например, базы данных.

В зависимости от требований к создаваемому сервису не во всех случаях данная семантика может быть действительно необходима. Вполне возможно, ей можно пожертвовать ради большей простоты и производительности вашего приложения.

Следите за новыми постами
Следите за новыми постами по любимым темам
4К открытий4К показов