Принципы реактивного программирования с использованием библиотеки ReactiveX для Python на примере простого RSS-агрегатора
В последние годы реактивное программирование в целом, а технология ReactiveX в частности, обретает всё большую популярность среди разработчиков. Одни уже активно используют все преимущества этого подхода, а другие только “что-то слышали”. Со своей стороны я постараюсь помочь вам представить, насколько некоторые концепции реактивного программирования способны изменить взгляд на привычные, казалось бы, вещи.
Существует два принципиально различных способа организации больших систем: в соответствии с объектами и состояниями, которые живут в системе, и в соответствии с потоками данных, которые проходят через неё. Парадигма реактивного программирования предполагает легкость выражения потоков данных, а также распространение изменений благодаря этим потокам. Например, в императивном программировании операция присваивания означает конечность результата, тогда как в реактивном значение будет пересчитано при получении новых входных данных. Поток значений проходит в системе ряд трансформаций, которые необходимы для решения определенной задачи. Оперирование потоками позволяет системе быть расширяемой и асинхронной, а правильная реакция на возникающие ошибки – отказоустойчивой.
ReactiveX – библиотека, позволяющая создавать асинхронные и событийно-ориентированные программы, использующие наблюдаемые последовательности. Она расширяет шаблон Наблюдателя для поддержки последовательностей данных, добавляет операторы для их декларативного соединения, избавляя от необходимости заботиться о синхронизации и безопасности потоков, разделяемых структурах данных и неблокирующего I/O.
Одним из основных отличий библиотеки ReactiveX от функционального реактивного программирования является то, что она оперирует не непрерывно изменяющимися, а дискретными значениями, которые “испускаются” в течении длительного времени.
Стоит немного рассказать о том, что такое Observer, Observable, Subject. Модель Observable является источником данных и позволяет обрабатывать потоки асинхронных событий похожим образом с тем, который вы используете для коллекций данных, таких как массивы. И всё это вместо колбэков, а значит, код является более читабельным и менее склонным к ошибкам.
В ReactiveX наблюдатель (Observer) подписывается на Observable и впоследствии реагирует на элемент или последовательность элементов, которые тот отправляет. У каждого Observer, подписанного на Observable, вызывается метод Observer.on_next()
на каждый элемент потока данных, после которого может быть вызван как Observer.on_complete()
, так и Observer.on_error()
. Часто Observable применяется таким образом, что он не начинает отдавать данные до тех пор, пока кто-нибудь не подписывается на него. Это так называемые “ленивые вычисления” – значения вычисляются только тогда, когда в них возникает потребность.
Бывают задачи, для решения которых нужно соединить Observer и Observable, чтобы принимать сообщения о событиях и сообщать о них своим подписчикам. Для этого существует Subject, имеющий, кроме стандартной, ещё несколько реализаций:
- ReplaySubject имеет возможность кэшировать все поступившие в него данные, а при появлении нового подписчика – отдавать всю эту последовательность сначала, работая далее в обычном режиме.
- BehaviorSubject хранит последнее значение, по аналогии с ReplaySubject отдавая его появившемуся подписчику. При создании он получает значение по умолчанию, которое будет получать каждый новый подписчик, если последнего значения еще не было.
- AsyncSubject также хранит последнее значение, но не отдает данные, пока не завершится вся последовательность.
Observable и Observer – только начало ReactiveX. Они не несут в себе всю мощь, которую являют собой операторы, позволяющие трансформировать, объединять, манипулировать последовательностями элементов, которые отдают Observable.
В документации ReactiveX описание операторов включает в себя использование Marble Diagram. К примеру, вот как эти диаграммы представляют Observable и их трансформации:
Глядя на диаграмму ниже, легко понять, что оператор map трансформирует элементы, отдаваемые Observable, путем применения функции к каждому из них:
Хорошей иллюстрацией возможностей ReactiveX является приложение RSS-агрегатора. Здесь возникает необходимость асинхронной загрузки данных, фильтрации и трансформации значений, поддержания актуального состояния путем периодического обновления.
В этой статье примеры для представления основных принципов ReactiveX написаны с использованием библиотеки rx для языка программирования Python. Вот так, например, выглядит абстрактная реализация наблюдателя:
Наше приложение в режиме реального времени будет обмениваться сообщениями с браузером посредством веб-сокетов. Возможность легко реализовать это предоставляет Tornado.
Работа программы начинается с запуска сервера. При обращении браузера к серверу открывается веб-сокет.
Для обработки введенного пользователем запроса создается Subject, при подписке на который он отправляет значение по умолчанию (в нашем случае — пустую строку), а затем раз в секунду отправляет то, что введено пользователем и удовлетворяет условиям: длина 0 или больше 2, значение изменилось.
Также для периодического обновления новостей предусмотрен Observable, который раз в 60 секунд отдает значение.
Два этих потока соединяются оператором combine_latest
, в цепочку встраивается Observable для получения списка новостей. После чего на этот Observable создается подписка, вся цепочка начинает работать только в этот момент.
Следует подробнее остановиться на том, что такое “Observable для получения списка новостей”. Из списка url для получения новостей мы создаем поток данных, элементы которого приходят в функцию, где при помощи HTTP-клиента Tornado AsyncHTTPClient происходит асинхронная загрузка данных для каждого элемента списка urls. Из них также создается поток данных, который фильтруется по запросу, введенному пользователем. Из каждого потока мы берем по 5 новостей, которые приводим к нужному формату для отправки на фронтенд.
После того, как поток выходных данных сформирован, его подписчик начинает поэлементно получать данные. Функция send_response
отправляет полученные значения во фронтенд, который добавляет новость в список.
В файле feeder.js
Таким образом, реализуется push-технология, в которой данные поступают от сервера к фронтенду, который лишь отправляет введенный пользователем запрос для поиска по новостям.
В качестве заключения предлагаю задуматься о том, какая реализация получилась бы при привычном подходе с использованием колбэков вместо Observable, без возможности легко объединить потоки данных, без возможности мгновенной отправки данных потребителю-фронтенду и с необходимостью отслеживать изменения в строке запроса. Среди Python-разработчиков технология пока что практически не распространена, однако я вижу уже несколько возможностей её применить на текущих проектах.
Пример использования ReactiveX для Python вы можете найти в GitHub репозитории с демо-проектом RSS-агрегатора.
Выражаем благодарность Ксении, Python-разработчику компании Noveo, за подробное описание преимуществ реактивного программирования!
7К открытий7К показов