Реактивное программирование на реальных примерах: подробное введение

Реактивное программирование для новичков

Обучение реактивному подходу в программировании — достаточно непростая вещь, и недостаток обучающих материалов только усугубляет этот процесс. Большинство существующих обучающих пособий не дают глубокого обзора и не рассказывают о том, как спроектировать архитектуру проекта в целом.

Этот материал направлен на то, чтобы помочь новичкам начать думать по-настоящему “реактивно”.

Так что же такое реактивное программирование?

Есть множество не до конца верных определений и объяснений в интернете. Википедия дает слишком скупое описание. Ответы на Stack Overflow часто непонятны новичкам. Реактивный Манифест выглядит так, будто его писали для руководителей проектов или бизнесменов. Rx терминология от Microsoft, гласящая о том, что “Rx = Observables + LINQ + Schedulers”, звучит настолько тяжело и по-майкрософтовски, что большинство из нас слабо понимает, о чем идёт речь. Такие термины, как “реактивность” и “распространение изменений” не выражают ничего, что бы отличалось от обычного MV* подхода, реализованного уже на бесчисленном множестве языков. Любой фреймворк реагирует на изменения моделей. В любом фреймворке изменения распространяются. Если бы это было не так, пользователь не видел бы никаких изменений.

Дадим подробное объяснение термину “реактивное программирование”.

Реактивное программирование — программирование с асинхронными потоками данных

Впрочем, ничего нового. Event bus’ы или обычные события клика — это тоже асинхронные потоки данных, которые вы можете прослушивать, чтобы реагировать какими-либо действиями. Реактивность — это та же самая идея, возведенная в абсолют. Вы можете создавать потоки данных не только из событий наведения или кликания мышью. Потоком может быть что угодно: переменные, пользовательский ввод, свойства, кэш, структуры данных и т.п. Например, представьте, что ваша лента новостей в Твиттере — поток событий. Вы можете слушать этот поток и реагировать на события соответственно.

Кроме этого, вы получаете удивительный набор функций для комбинирования, создания и фильтрации этих потоков. Вот где проявляется вся магия этого подхода. Один или несколько потоков могут использоваться как входные данные для другого потока. Вы можете объединять два потока. Также вы можете фильтровать поток, выбирая только те события, которые вам интересны.

Так как потоки — основопологающая вещь в реактивном подходе, давайте рассмотрим их подробнее на примере пользовательского клика мышью:

Реактивное программирование

Поток — это последовательность событий, упорядоченная по времени. Он может выбрасывать три типа данных: значение (определенного типа), ошибку или сигнал завершения. Сигнал завершения распространяется, когда текущее окно или окно, содержащее кнопку, закрывается.

Мы перехватываем эти события асинхронно, указывая одну функцию, которая будет вызываться, когда выброшено значение, другую для ошибок и третью для обработки сигнала завершения. В некоторых случаях можно опустить последние две и сфокусироваться на объявлении функции для перехвата значений. Прослушивание потока называется подпиской (subscribing). Функции, которые мы объявляем, называются наблюдателями (observer). Поток — это объект наших наблюдений (observable, наблюдаемый объект). Это в точности паттерн проектирования, называемый “Наблюдатель“. Подробнее о шаблонах проектирования для новичков, читайте в нашей статье.

В данном руководстве мы будем использовать альтернативный способ представления вышеупомянутой диаграммы с помощью ASCII символов:

--a---b-c---d---X---|->

a, b, c, d - генерируемые значения
X - ошибка
| - сигнал завершения
---> - временная ось

Теперь давайте сгенерируем новые потоки сообщений клика, трансформированные из оригинального потока.

Для начала сделаем поток счетчиков, который определяет, сколько раз кнопка была нажата. В большинстве реактивных библиотек у каждого потока есть множество встроенных функций, таких как map, filter, scan и т.д. Когда вы вызываете одну из этих функций, например clickStream.map(f), она возвращает новый поток, основанный на родительском. Функции не модифицируют родительский поток. Это называется неизменяемостью и является неотъемлемой частью реактивного подхода, позволяя нам вызывать цепочку функций, например clickStream.map(f).scan(g):

  clickStream: ---c----c--c----c------c-->
               vvv map(c становится 1) vvv
               ---1----1--1----1------1-->
               vvvvvvvvv scan(+) vvvvvvvvv
counterStream: ---1----2--3----4------5-->

Функция map(f) заменяет каждое полученное значение в соответствии с вашей реализацией функции f. В нашем случае функция map производит значение “1” после каждого клика. Функция scan(g) аггрегирует все предыдущие значения, производя значение x = g(accumulated, current), где g в данном случае — это просто функция сложения. В конечном итоге counterStream выбрасывает общее количество кликов.

Чтобы вы поняли всю мощь реактивного подхода, давайте предположим, что вы хотите реализовать поток событий “двойной клик”. Чтобы сделать эту задачу еще интереснее новый поток должен принимать множественные нажатия за двойные. Представьте себе, как бы вы реализовывали эту задачу в императивном стиле. Потребовалось бы несколько переменных, хранящих состояние, и использование интервалов.

В реактивном же подходе все достаточно просто. Всю описанную выше логику можно реализовать четырьмя строками кода. Но давайте пока не останавливаться на коде. Лучший способ научиться понимать и проектировать потоки, вне зависимости от того, новичок вы или эксперт — это изображать диаграммы:

Реактивное программирование

Серые прямоугольники — это функции, которые трансформируют один поток в другой. Сначала мы собираем клики в списки. Если прошло 250 миллисекунд без единого нажатия кнопки — мы применяем функцию map() на каждом из списков, чтобы вычислить его длину. В конце мы фильтруем списки с длиной 1, используя функцию filter(x >= 2). Вот так, в три действия, мы получаем результат — поток событий множественных кликов. Мы можем подписаться на него и использовать, как пожелаем.

Этот пример показывает всю простоту, с которой реализовывается достаточно сложная на первый взгляд задача, если мы используем реактивный подход.

Для чего нужно реактивное программирование

Реактивный подход повышает уровень абстракции вашего кода и вы можете сконцентрироваться на взаимосвязи событий, которые определяют бизнес-логику, вместо того, чтобы постоянно поддерживать код с большим количеством деталей реализации. Код в реактивном программировании, вероятно, будет короче.

Преимущество более заметно в современных веб- и мобильных приложениях, которые работают с большим количеством разнообразных UI-событий. 10 лет назад всё взаимодействие с веб-страницей сводилось к отправке больших форм на сервер и выполнении простого рендеринга в клиентской части. Сейчас приложения более сложны: изменение одного поля может повлечь за собой автоматическое сохранение данных на сервере, информация о новом “лайке” должна отправиться другим подключенным пользователям и т.д.

Реактивное программирование очень хорошо подходит для обработки большого количества разнообразных событий.

Начинаем думать в реактивном стиле

В последующих примерах используется JavaScript и RxJS, но Rx-библиотеки доступны для многих других языков и платформ (.NET, Java, Scala, ClojureJavaScript, Ruby, Python, C++, Objective-C/Cocoa, Groovy, и т.д.). На нашем сайте есть руководства по использованию библиотек RxSwift и ReactiveX в Python.

Реализуем виджет “На кого подписаться”

В Twitter есть такой виджет, который предлагает вам другие аккаунты, на которые вы можете подписаться:

Мы намерены реализовать его основную функциональность:

  • Загрузка из API и вывод трех аккаунтов;
  • По клику кнопки “Обновить” вывод других трех аккаунтов;
  • По клику кнопки “x” рядом с аккаунтом — удаление его из виджета и вывод другого аккаунта;
  • Отображение аватарки и ссылки на аккаунт в каждой из трех строк.

Вместо Twitter-аккаунтов, которые закрыты для неавторизованных пользователей, мы будем использовать Github API и брать аккаунты оттуда. Ссылку на Github API для получения списка пользователей вы можете найти в официальной документации. Также можете смотреть на готовый код данного примера.

Запрос и ответ

Как подойти к решению этой проблемы в Rx-стиле? Надо начать с того, что (почти) все, что угодно может быть потоком. Первое, что мы реализуем, будет “Загрузка из API и вывод трех аккаунтов”. Ничего необычного, нужно просто (1) сделать запрос, (2) получить ответ, (3) отобразить ответ. Представим запрос в качестве потока.

При инициализации мы должны сделать только один запрос, так что если мы смоделируем его, как поток данных, он будет выбрасывать только одно значение. В дальнейшем мы будем делать множество запросов, но на данный момент нам нужен только один.

--a------|->

Где а - это строка 'https://api.github.com/users'

Когда происходит запрос, он сообщает нам две вещи: когда запрос должен быть выполнен — время генерации события, и куда мы делаем запрос — значение генерируемого событие, строка, содержащая URL.

Создать поток, содержащий одно значение, очень просто с библиотеками семейства Rx*:

var requestStream = Rx.Observable.just('https://api.github.com/users');

То, что мы написали — это просто поток, содержащий строку, который не делает ничего, так что мы должны как-то заставить его действовать так, как нам нужно. Это делается с помощью подписки на поток:

requestStream.subscribe(function(requestUrl) {
  // выполняем запрос
  jQuery.getJSON(requestUrl, function(responseData) {
    // ...
  });
}

Заметьте, что мы используем Ajax-коллбэк (callback) из библиотеки jQuery, чтобы управлять асинхронностью операции запроса. Если вы слабо понимаете, что такое callback’и, почитайте нашу статью об эволюции асинхронного программирования в JS. “Но подождите, Rx же работает с асинхронными потоками данных. Не может ли ответ на запрос быть потоком, содержащим данные, которые придут когда-нибудь позже?” — можете спросить вы. Что ж, на концептуальном уровне все верно, давайте попробуем это реализовать:

requestStream.subscribe(function(requestUrl) {
  // выполняем запрос
  var responseStream = Rx.Observable.create(function (observer) {
    jQuery.getJSON(requestUrl)
    .done(function(response) { observer.onNext(response); })
    .fail(function(jqXHR, status, error) { observer.onError(error); })
    .always(function() { observer.onCompleted(); });
  });

  responseStream.subscribe(function(response) {
    // делаем что-то с ответом
  });
}

Rx.Observable.create() создает пользовательский поток данных, информируя каждого подписчика о событиях (onNext()) или ошибках (onError()). Мы обернули Ajax-промис в соответствующий коллбэк. Значит ли это, что Promise — то же самое, что Observable? Да, значит. Подробнее о Promis’ах читайте в нашей вводной статье.

Observable — это Promise++. В Rx вы можете конвертировать Promise в Observable очень простым образом:

var stream = Rx.Observable.fromPromise(promise);

Единственное отличие между Promise и Observable в том, что Observable не совместим с Promises/A+. Promise — это, по сути, Observable с одним генерируемым значением. Потоки в Rx расширяют промисы, позволяя возвращать множество значений.

Возвращаясь к нашему примеру: вы можете заметить, что мы вызываем функцию subscribe() два раза — один внутри другого. Также создание responseStream зависит от requestStream. Как было сказано выше, в Rx есть простые механизмы, позволяющие трансформировать и создавать новые потоки из других, так что мы должны этим воспользоваться.

Одна из таких функций, с которой вы уже познакомились — map(f) — берет каждое значение из потока A, применяет на нем f() и производит значение для потока B. Если мы применим эту функцию на потоке запроса и ответа, мы можем преобразовать список URL’ов в промисы ответа.

var responseMetastream = requestStream
  .map(function(requestUrl) {
    return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
  });

Затем мы должны создать поток потоков, называемый так же метапоток (metastream). Не пугайтесь, все достаточно просто. Метапоток — это такой поток, в котором каждое генерируемое им значение является потоком. Можно представить их себе как указатели: каждое генерируемое значение — указатель на новый поток. В нашем примере URL каждого запроса преобразуется в указатель на поток, содержащий промис ответа.

“Но зачем же нам обернутые в потоки ответы?” — можете спросить вы. В данном случае можно преобразовать метапоток в обычный поток ответов сервера, в котором каждое генерируемое значение является JSON-объектом, а не промисом, с помощью функции flatmap(). Но, тем не менее, метапотоки — обычное явление в реактивном программировании, и в частности, они используются для обработки асинхронных запросов.

var responseStream = requestStream
  .flatMap(function(requestUrl) {
    return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
  });

Как и ожидалось, если у нас впоследствии будут какие-то события, генерируемые потоком запросов, поток ответов будет реагировать на них соответствующим образом:

requestStream:  --a-----b--c------------|->
responseStream: -----A--------B-----C---|->

(нижним регистром обозначен запрос, верхним - ответ)

И теперь, когда у нас есть поток ответов, мы можем отрендерить получаемые данные:

responseStream.subscribe(function(response) {
  // рендерим ответ в DOM
});

Весь код целиком:

var requestStream = Rx.Observable.just('https://api.github.com/users');

var responseStream = requestStream
  .flatMap(function(requestUrl) {
    return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
  });

responseStream.subscribe(function(response) {
  // рендерим ответ в DOM
});

Кнопка обновления

Нужно отметить, что список пользователей, который мы получаем по API, состоит из 100 элементов. API позволяет нам задавать смещение списка, но не его размер, так что пока мы используем только 3 объекта, игнорируя остальные. Вы научитесь кэшировать ответ чуть позже.

Каждый раз, когда пользователь нажимает кнопку обновления, поток запросов должен сгенерировать URL, чтобы мы могли получить новые данные. Для этой задачи нам потребуется сделать две вещи: реализовать поток событий нажатия на кнопку, а также изменить поток запросов так, чтобы он реагировал на события потока нажатий. К счастью, RxJS позволяет нам преобразовать обычные JavaScript-события в Observable.

var refreshButton = document.querySelector('.refresh');
var refreshClickStream = Rx.Observable.fromEvent(refreshButton, 'click');

Давайте поменяем поток запросов так, чтобы при нажатии кнопки обновления генерировался URL со случайным параметром смещения списка.

var requestStream = refreshClickStream
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return 'https://api.github.com/users?since=' + randomOffset;
  });

Похоже, мы что-то сломали. Теперь запрос срабатывает только после того, как мы нажали кнопку. Но по условию задачи мы должны делать запрос и при инициализации. Давайте попробуем починить наш код.

Для начала создадим разные потоки для вышеупомянутых условий:

var requestOnRefreshStream = refreshClickStream
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return 'https://api.github.com/users?since=' + randomOffset;
  });

var startupRequestStream = Rx.Observable.just('https://api.github.com/users');

Но как же теперь соединить события этих двух потоков в один? В этом нам поможет функция merge(). Вот визуальное представление того, что она делает:

stream A: ---a--------e-----o----->
stream B: -----B---C-----D-------->
          vvvvvvvvv merge vvvvvvvvv
          ---a-B---C--e--D--o----->

Ну, теперь все очень просто:

var requestOnRefreshStream = refreshClickStream
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return 'https://api.github.com/users?since=' + randomOffset;
  });

var startupRequestStream = Rx.Observable.just('https://api.github.com/users');

var requestStream = Rx.Observable.merge(
  requestOnRefreshStream, startupRequestStream
);

Также есть альтернативный и более чистый способ реализовать задачу без вспомогательных переменных:

var requestStream = refreshClickStream
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return 'https://api.github.com/users?since=' + randomOffset;
  })
  .merge(Rx.Observable.just('https://api.github.com/users'));

А можно и еще короче!

var requestStream = refreshClickStream
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return 'https://api.github.com/users?since=' + randomOffset;
  })
  .startWith('https://api.github.com/users');

Функция startWith() делает как раз то, что нам нужно. Не важно, как вы реализовали поток: если вы вызвали функцию startWith(x), x будет начальным значением.

Вы заметили, что у нас дублируется URL? Давайте избавимся от дубликата, передвинув startWith() поближе к refreshClickStream, чтобы эмулировать нажатие кнопки при инициализации приложения:

var requestStream = refreshClickStream.startWith('startup click')
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return 'https://api.github.com/users?since=' + randomOffset;
  });

То, что надо!

Моделируем 3 рекомендации с помощью потоков

Теперь, вместе с кнопкой обновления, у нас появилась проблема: при нажатии этой кнопки текущие 3 рекомендации не исчезают. Новые предложения появляются, как только с сервера пришел ответ, но для того, чтобы наш UI выглядел отзывчивым, мы должны очищать текущие предложения сразу же.

refreshClickStream.subscribe(function() {
  // очищаем текущие рекомендации
});

Теперь у нас есть два подписчика, влияющих на DOM-элементы (другой подписывается на responseStream) и это соответствует принципу “Разделяй и властвуй“. Вы еще помните мантру реактивного подхода? Напоминаем:

Реактивное программирование

Все является потоком

Давайте сделаем так, чтобы рекомендации были потоками, в которых каждое генерируемое значение — это JSON-объект, содержащий данные рекомендации. Мы сделаем по потоку на каждую из трех рекомендаций. Вот так выглядит поток для рекомендации №1:

var suggestion1Stream = responseStream
  .map(function(listUsers) {
    // берем случайную из списка
    return listUsers[Math.floor(Math.random()*listUsers.length)];
  });

Скопипастим этот код для потока №2 (suggestion2Stream) и №3 (suggestion3Stream). Подумайте над тем, как можно избежать дублирования кода в данном примере. Это будет отличным упражнением. А еще и очень хорошим способом избежать эффекта последней строки.

Вместо того, чтобы рендерить данные в методе subscribe() потока responseStream, мы сделаем это здесь:

suggestion1Stream.subscribe(function(suggestion) {
  // рендерим первую рекомендацию
});

Теперь мы можем обнулять рекомендацию при нажатии кнопки обновления:

var suggestion1Stream = responseStream
  .map(function(listUsers) {
    // берем случайную из списка
    return listUsers[Math.floor(Math.random()*listUsers.length)];
  })
  .merge(
    refreshClickStream.map(function(){ return null; })
  );

Этот код будет интерпретировать null, как “нет данных” и скрывать DOM-элемент.

suggestion1Stream.subscribe(function(suggestion) {
  if (suggestion === null) {
    // прячем DOM-элемент первой рекомендации
  }
  else {
    // показываем DOM-элемент первой рекомендации
    // и рендерим данные
  }
});

Вот диаграмма того, что мы только что реализовали:

refreshClickStream: ----------o--------o---->
     requestStream: -r--------r--------r---->
    responseStream: ----R---------R------R-->   
 suggestion1Stream: ----s-----N---s----N-s-->
 suggestion2Stream: ----q-----N---q----N-q-->
 suggestion3Stream: ----t-----N---t----N-t-->

В данном случае N — это null.

В качестве бонуса мы также можем выводить “пустые” рекомендации при инициализации с помощью startWith(null):

var suggestion1Stream = responseStream
  .map(function(listUsers) {
    // берем случайного пользователя из списка
    return listUsers[Math.floor(Math.random()*listUsers.length)];
  })
  .merge(
    refreshClickStream.map(function(){ return null; })
  )
  .startWith(null);
refreshClickStream: ----------o---------o---->
     requestStream: -r--------r---------r---->
    responseStream: ----R----------R------R-->   
 suggestion1Stream: -N--s-----N----s----N-s-->
 suggestion2Stream: -N--q-----N----q----N-q-->
 suggestion3Stream: -N--t-----N----t----N-t-->

Удаление рекомендации и кэширование ответа сервера

Последняя функция, которую мы реализуем — удаление рекомендации. Напротив каждой рекомендации есть кнопка удаления, которая закрывает текущую рекомендацию и показывает вместо неё новую. Первая мысль, которая может у вас возникнуть — нужно делать новый запрос по нажатию этой кнопки:

var close1Button = document.querySelector('.close1');
var close1ClickStream = Rx.Observable.fromEvent(close1Button, 'click');
// то же самое для close2Button и close3Button

var requestStream = refreshClickStream.startWith('startup click')
  .merge(close1ClickStream) // we added this
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return 'https://api.github.com/users?since=' + randomOffset;
  });

Это не сработает. Перезагрузятся все рекомендации. Существует несколько способов решения этой проблемы, и один из наиболее оптимальных — повторное использование предыдущего ответа сервера. Ответ API состоит из списка длиной в 100 элементов, из которых мы используем только 3. Нет надобности запрашивать новые данные, когда мы можем использовать 97 “свежих”.

Когда происходит нажатие кнопки “close1”, нам нужно использовать наиболее свежий ответ сервера, чтобы получить случайного пользователя из списка. Примерно так:

    requestStream: --r--------------->
   responseStream: ------R----------->
close1ClickStream: ------------c----->
suggestion1Stream: ------s-----s----->

В Rx* есть функция-комбинатор combineLatest, которая принимает два потока A и B в качестве входных данных и, когда один из потоков генерирует значение, возвращает два наиболее свежих значения из A и B. Посмотрим на диаграмму того, что она делает:

stream A: --a-----------e--------i-------->
stream B: -----b----c--------d-------q---->
          vvvvvvvv combineLatest(f) vvvvvvv
          ----AB---AC--EC---ED--ID--IQ---->

где f - функция uppercase()

Мы можем применить функцию combineLatest() на потоках close1ClickStream и responseStream для того, чтобы после клика по кнопке “close1” мы получали последний ответ сервера и генерировали новое значение в suggestion1Stream. С другой стороны combineLatest() симметрична: когда новый ответ генерируется в responseStream, он будет скомбинирован с последним кликом “close 1” и выведет новую рекомендацию. Это позволяет нам упростить наш код suggestion1Stream, который мы писали ранее:

var suggestion1Stream = close1ClickStream
  .combineLatest(responseStream,             
    function(click, listUsers) {
      return listUsers[Math.floor(Math.random()*listUsers.length)];
    }
  )
  .merge(
    refreshClickStream.map(function(){ return null; })
  )
  .startWith(null);

В этой схеме не хватает только одной детали: combineLatest() использует наиболее свежие данные из двух источников, но если один из источников еще не сгенерировал ни одного значения, combineLatest() не произведет события в поток вывода. Если вы посмотрите на диаграмму выше, вы можете заметить, что на выходе ничего нет, когда первый поток генерирует значение a. Только когда второй поток генерирует значение b, на выходе появляется значение.

Есть несколько способов решения этой проблемы и мы воспользуемся простейшим: симулированием клика кнопки “close1” при инициализации:

var suggestion1Stream = close1ClickStream.startWith('startup click') // мы добавили это
  .combineLatest(responseStream,             
    function(click, listUsers) {l
      return listUsers[Math.floor(Math.random()*listUsers.length)];
    }
  )
  .merge(
    refreshClickStream.map(function(){ return null; })
  )
  .startWith(null);

Все, курс молодого бойца по реактивному программированию окончен! Вы можете взглянуть на рабочий код.

Перевод статьи «The introduction to Reactive Programming you've been missing»