Принципы реактивного программирования с использованием библиотеки ReactiveX для Python на примере простого RSS-агрегатора

В последние годы реактивное программирование в целом, а технология ReactiveX в частности, обретает всё большую популярность среди разработчиков. Одни уже активно используют все преимущества этого подхода, а другие только «что-то слышали». Со своей стороны я постараюсь помочь вам представить, насколько некоторые концепции реактивного программирования способны изменить взгляд на привычные, казалось бы, вещи.

Существует два принципиально различных способа организации больших систем: в соответствии с объектами и состояниями, которые живут в системе, и в соответствии с потоками данных, которые проходят через неё. Парадигма реактивного программирования предполагает легкость выражения потоков данных, а также  распространение изменений благодаря этим потокам. Например, в императивном программировании операция присваивания означает конечность результата, тогда как в реактивном значение будет пересчитано при получении новых входных данных. Поток значений проходит в системе ряд трансформаций, которые необходимы для решения определенной задачи. Оперирование потоками позволяет системе быть расширяемой и асинхронной, а правильная реакция на возникающие ошибки – отказоустойчивой.

ReactiveX – библиотека, позволяющая создавать асинхронные и событийно-ориентированные программы, использующие наблюдаемые последовательности. Она расширяет шаблон Наблюдателя для поддержки последовательностей данных, добавляет операторы для их декларативного соединения, избавляя от необходимости заботиться о синхронизации и безопасности потоков, разделяемых структурах данных и неблокирующего I/O.

Одним из основных отличий библиотеки ReactiveX от функционального реактивного программирования является то, что она оперирует не непрерывно изменяющимися, а дискретными значениями, которые «испускаются» в течении длительного времени.

Стоит немного рассказать о том, что такое Observer, Observable, Subject. Модель Observable является источником данных и позволяет обрабатывать потоки асинхронных событий похожим образом с тем, который вы используете для коллекций данных, таких как массивы. И всё это вместо колбэков, а значит, код является более читабельным и менее склонным к ошибкам.

В ReactiveX наблюдатель (Observer) подписывается на Observable и впоследствии реагирует на элемент или последовательность элементов, которые тот отправляет. У каждого Observer, подписанного на Observable, вызывается метод Observer.on_next() на каждый элемент потока данных, после которого может быть вызван как Observer.on_complete(), так и Observer.on_error(). Часто Observable применяется таким образом, что он не начинает отдавать данные до тех пор, пока кто-нибудь не подписывается на него. Это так называемые «ленивые вычисления» – значения вычисляются только тогда, когда в них возникает потребность.

observable

Бывают задачи, для решения которых нужно соединить Observer и Observable, чтобы принимать сообщения о событиях и сообщать о них своим подписчикам. Для этого существует Subject, имеющий, кроме стандартной, ещё несколько реализаций:

  • ReplaySubject имеет возможность кэшировать все поступившие в него данные, а при появлении нового подписчика – отдавать всю эту последовательность сначала, работая далее в обычном режиме.
  • BehaviorSubject хранит последнее значение, по аналогии с ReplaySubject отдавая его появившемуся подписчику. При создании он получает значение по умолчанию, которое будет получать каждый новый подписчик, если последнего значения еще не было.
  • AsyncSubject также хранит последнее значение, но не отдает данные, пока не завершится вся последовательность.

Observable и Observer – только начало ReactiveX. Они не несут в себе всю мощь, которую являют собой операторы,  позволяющие трансформировать, объединять, манипулировать последовательностями элементов, которые отдают Observable.

В документации ReactiveX описание операторов включает в себя использование Marble Diagram. К примеру, вот как эти диаграммы представляют Observable и их трансформации:

legend

Глядя на диаграмму ниже, легко понять, что оператор map трансформирует элементы, отдаваемые Observable, путем применения функции к каждому из них:

map

Хорошей иллюстрацией возможностей ReactiveX является приложение RSS-агрегатора. Здесь возникает необходимость асинхронной загрузки данных, фильтрации и трансформации значений, поддержания актуального состояния путем периодического обновления.

В этой статье примеры для представления основных принципов ReactiveX написаны с использованием библиотеки rx для языка программирования Python. Вот так, например, выглядит абстрактная реализация наблюдателя:

Наше приложение в режиме реального времени будет обмениваться сообщениями с браузером посредством веб-сокетов. Возможность легко реализовать это предоставляет Tornado.

Работа программы начинается с запуска сервера. При обращении браузера к серверу открывается веб-сокет.

Для обработки введенного пользователем запроса создается Subject, при подписке на который он отправляет значение по умолчанию (в нашем случае — пустую строку), а затем раз в секунду отправляет то, что введено пользователем и удовлетворяет условиям: длина 0 или больше 2, значение изменилось.

Также для периодического обновления новостей предусмотрен Observable, который раз в 60 секунд отдает значение.

Два этих потока соединяются операторомcombine_latest , в цепочку встраивается Observable для получения списка новостей. После чего на этот Observable создается подписка, вся цепочка начинает работать только в этот момент.

Следует подробнее остановиться на том, что такое «Observable для получения списка новостей». Из списка url для получения новостей мы создаем поток данных, элементы которого приходят в функцию, где при помощи HTTP-клиента Tornado AsyncHTTPClient происходит асинхронная загрузка данных для каждого элемента списка urls. Из них также создается поток данных, который фильтруется по запросу, введенному пользователем. Из каждого потока мы берем по 5 новостей, которые приводим к нужному формату для отправки на фронтенд.

После того, как поток выходных данных сформирован, его подписчик начинает поэлементно получать данные. Функцияsend_response  отправляет полученные значения во фронтенд, который добавляет новость в список.

В файле feeder.js

Таким образом, реализуется push-технология, в которой данные поступают от сервера к фронтенду, который лишь отправляет введенный пользователем запрос для поиска по новостям.

В качестве заключения предлагаю задуматься о том, какая реализация получилась бы при привычном подходе с использованием колбэков вместо Observable, без возможности легко объединить потоки данных, без возможности мгновенной отправки данных потребителю-фронтенду и с необходимостью отслеживать изменения в строке запроса. Среди Python-разработчиков технология пока что практически не распространена, однако я вижу уже несколько возможностей её применить на текущих проектах.

Пример использования ReactiveX для Python вы можете найти в GitHub репозитории с демо-проектом RSS-агрегатора.


Выражаем благодарность Ксении, Python-разработчику компании Noveo, за подробное описание преимуществ реактивного программирования!