Многопоточное программирование в Java 8. Часть третья. Атомарные переменные и конкурентные таблицы
90К открытий94К показов
Рассказывает Бенджамин Винтерберг, Software Engineer
Добро пожаловать в третью часть руководства по параллельному программированию в Java 8. В первой части мы рассматривали, как выполнять код параллельно с помощью потоков, задач и сервисов исполнителей. Во второй разбирались с тем, как синхронизировать доступ к изменяемым объектам с помощью ключевого слова synchronized, блокировок и семафоров. Сегодня, в заключительной части, я расскажу о двух очень важных частях Concurrency API: об атомарных переменных и о конкурентных таблицах (Concurrent Maps).
AtomicInteger
Пакет java.concurrent.atomic содержит много полезных классов для выполнения атомарных операций. Операция называется атомарной тогда, когда её можно безопасно выполнять при параллельных вычислениях в нескольких потоках, не используя при этом ни блокировок, ни synchronized, как мы это делали в предыдущем уроке.
Внутри атомарные классы очень активно используют сравнение с обменом (compare-and-swap, CAS), атомарную инструкцию, которую поддерживает большинство современных процессоров. Эти инструкции работают гораздо быстрее, чем синхронизация с помощью блокировок. Поэтому, если вам просто нужно изменять одну переменную с помощью нескольких потоков, лучше выбирать атомарные классы.
Приведу несколько примеров с использованием AtomicInteger, одного из атомарных классов:
Как видите, использование AtomicInteger вместо обычного Integer позволило нам корректно увеличить число, распределив работу сразу по двум потокам. Мы можем не беспокоиться о безопасности, потому что incrementAndGet() является атомарной операцией.
Класс AtomicInteger поддерживает много разных атомарных операций. Метод updateAndGet() принимает в качестве аргумента лямбда-выражение и выполняет над числом заданные арифметические операции:
Метод accumulateAndGet() принимает лямбда-выражения типа IntBinaryOperator. Вот как мы можем использовать его, чтобы просуммировать все числа от нуля до тысячи:
Среди других атомарных классов хочется упомянуть такие как AtomicBoolean, AtomicLong и AtomicReference.
LongAdder
Класс LongAdder может выступать в качестве альтернативы AtomicLong для последовательного сложения чисел.
Так же, как и у других атомарных чисел, у LongAdder есть методы increment() и add(). Но вместо того, чтобы складывать числа сразу, он просто хранит у себя набор слагаемых, чтобы уменьшить взаимодействие между потоками. Узнать результат можно с помощью вызова sum() или sumThenReset(). Этот класс используется в ситуациях, когда добавлять числа приходится гораздо чаще, чем запрашивать результат (часто это какие-то статистические исследование, например подсчёт количества запросов). Несложно догадаться, что, давая прирост в производительности, LongAdder требует гораздо большего количества памяти из-за того, что он хранит все слагаемые.
LongAccumulator
Класс LongAccumulator несколько расширяет возможности LongAdder. Вместо простого сложения он обрабатывает входящие значения с помощью лямбды типа LongBinaryOperator, которая передаётся при инициализации. Выглядит это так:
В этом примере при каждом вызове accumulate() значение аккумулятора увеличивается в два раза, и лишь затем суммируется с i. Так же, как и LongAdder, LongAccumulator хранит весь набор переданных значений в памяти.
прим. переводчика На самом деле, пример не совсем корректный; согласно документации, LongAccumulator не гарантирует порядка выполнения операций. Корректной формулой была бы, например x+2*y, т.к. при любом порядке выполнения в конце будет получаться одно и то же значение.
ConcurrentMap
Интерфейс ConcurrentMap наследуется от обычного Map и предоставляет описание одной из самой полезной коллекции для конкурентного использования. Чтобы продемонстрировать новые методы интерфейса, мы будем использовать вот эту заготовку:
Метод forEach() принимает лямбду типа BiConsumer. Этой лямбде будут передаваться в качестве аргументов все ключи и значения таблицы по очереди. Этот метод может использоваться как замена for-each циклам с итерацией по всем Entry. Итерация выполняется последовательно, в текущем потоке. эту запятую не надо убирать. если вы считаете, что надо - пожалуйста, сначала скажите мне
Метод putIfAbsent() помещает в таблицу значение, только если по данному ключу ещё нет другого значения. Этот метод является потокобезопасным (о крайней мере, в реализации ConcurrentHashMap), поэтому вам не нужно использовать synchronized, когда вы хотите использовать его в нескольких потоках (то же самое справедливо и для обычного put()):
Метод getOrDefault() работает так же, как и обычный get(), с той лишь разницей, что при отсутствии значения по данному ключу он вернёт значение по-умолчанию, передаваемое вторым аргументом:
Метод replaceAll() принимает в качестве аргумента лямбда-выражение типа BiFunction. Этой лямбде по очереди передаются все комбинации ключ-значения из карты, а результат, который она возвращает, записывается соответствующему ключу в качестве значения:
Если же вам нужно изменить таким же образом только один ключ, это позволяет сделать метод compute():
Кроме обычного compute(), существуют так же методы computeIfAbsent() и computeIfPresent(). Они изменяют значение только если значение по данному ключу отсутствует (или присутствует, соответственно).
И, наконец, метод merge(), который может быть использован для объединения существующего ключа с новым значением. В качестве аргумента он принимает ключ, новое значение и лямбду, которая определяет, как новое значение должно быть объединено со старым:
ConcurrentHashMap
Кроме методов, которые описаны в ConcurrencyMap, в ConcurrentHashMap было добавлено и ещё несколько своих. Так же, как и параллельные stream’ы, эти методы используют специальный ForkJoinPool, доступный через ForkJoinPool.commonPool() в Java 8. Этот пул использует свои настройки для количества потоков, основанные на количестве ядер. У меня их 4, а значит использоваться будет три потока:
Это значение может быть специально изменено с помощью параметра JVM:
Мы рассмотрим три новых метода: forEach, search and reduce. У каждого из них есть первый аргумент, который называется parallelismThreshold, который определяет минимальное количество элементов в коллекции, при котором операция будет выполняться в нескольких потоках. Т.е. если в коллекции 499 элементов, а первый параметр выставлен равным пятистам, то операция будет выполняться в одном потоке последовательно. В наших примерах мы будем использовать первый параметр равным в единице, чтобы операции всегда выполнялись параллельно.
Для примеров ниже мы будем использовать всё ту же таблицу, что и выше (однако объявим её именем класса, а не интерфейса. чтобы нам были доступны все методы):
ForEach
Работает метод так же, как и в ConcurrentMap. Для иллюстрации многопоточности мы будем выводить названия потоков (не забывайте, что их количество для меня ограничено тремя):
Search
Метод search() принимает лямбда-выражение типа BiFunction, в которую передаются все пары ключ-значение по очереди. Функция должна возвращать null, если необходимое вхождение найдено. В случае, если функция вернёт не null, дальнейший поиск будет остановлен. Не забывайте, что данные в хэш-таблице хранятся неупорядоченно. Если вы будете полагаться на порядок, в котором вы добавляли данные в неё, вы можете не получить ожидаемого результата. Если условиям поиска удовлетворяют несколько вхождений, результат точно предсказать нельзя.
Или вот другой пример, который полагается только на значения:
Reduce
Метод reduce() вы могли уже встречать в Java 8 Streams. Он принимает две лямбды типа BiFunction. Первая функция преобразовывает пару ключ/значение в один объект (любого типа). Вторая функция совмещает все полученные значения в единый результат, игнорируя любые возможные null-значения.
На этом всё. Надеюсь, мои статьи были вам полезны ?
90К открытий94К показов



