Многопоточное программирование в Java 8. Часть третья. Атомарные переменные и конкурентные таблицы
88К открытий90К показов
Рассказывает Бенджамин Винтерберг, Software Engineer
Добро пожаловать в третью часть руководства по параллельному программированию в Java 8. В первой части мы рассматривали, как выполнять код параллельно с помощью потоков, задач и сервисов исполнителей. Во второй разбирались с тем, как синхронизировать доступ к изменяемым объектам с помощью ключевого слова synchronized
, блокировок и семафоров. Сегодня, в заключительной части, я расскажу о двух очень важных частях Concurrency API: об атомарных переменных и о конкурентных таблицах (Concurrent Maps).
AtomicInteger
Пакет java.concurrent.atomic
содержит много полезных классов для выполнения атомарных операций. Операция называется атомарной тогда, когда её можно безопасно выполнять при параллельных вычислениях в нескольких потоках, не используя при этом ни блокировок, ни synchronized
, как мы это делали в предыдущем уроке.
Внутри атомарные классы очень активно используют сравнение с обменом (compare-and-swap, CAS), атомарную инструкцию, которую поддерживает большинство современных процессоров. Эти инструкции работают гораздо быстрее, чем синхронизация с помощью блокировок. Поэтому, если вам просто нужно изменять одну переменную с помощью нескольких потоков, лучше выбирать атомарные классы.
Приведу несколько примеров с использованием AtomicInteger
, одного из атомарных классов:
Как видите, использование AtomicInteger
вместо обычного Integer
позволило нам корректно увеличить число, распределив работу сразу по двум потокам. Мы можем не беспокоиться о безопасности, потому что incrementAndGet()
является атомарной операцией.
Класс AtomicInteger
поддерживает много разных атомарных операций. Метод updateAndGet()
принимает в качестве аргумента лямбда-выражение и выполняет над числом заданные арифметические операции:
Метод accumulateAndGet()
принимает лямбда-выражения типа IntBinaryOperator
. Вот как мы можем использовать его, чтобы просуммировать все числа от нуля до тысячи:
Среди других атомарных классов хочется упомянуть такие как AtomicBoolean, AtomicLong и AtomicReference.
LongAdder
Класс LongAdder
может выступать в качестве альтернативы AtomicLong
для последовательного сложения чисел.
Так же, как и у других атомарных чисел, у LongAdder
есть методы increment()
и add()
. Но вместо того, чтобы складывать числа сразу, он просто хранит у себя набор слагаемых, чтобы уменьшить взаимодействие между потоками. Узнать результат можно с помощью вызова sum()
или sumThenReset()
. Этот класс используется в ситуациях, когда добавлять числа приходится гораздо чаще, чем запрашивать результат (часто это какие-то статистические исследование, например подсчёт количества запросов). Несложно догадаться, что, давая прирост в производительности, LongAdder
требует гораздо большего количества памяти из-за того, что он хранит все слагаемые.
LongAccumulator
Класс LongAccumulator
несколько расширяет возможности LongAdder
. Вместо простого сложения он обрабатывает входящие значения с помощью лямбды типа LongBinaryOperator
, которая передаётся при инициализации. Выглядит это так:
В этом примере при каждом вызове accumulate()
значение аккумулятора увеличивается в два раза, и лишь затем суммируется с i
. Так же, как и LongAdder
, LongAccumulator
хранит весь набор переданных значений в памяти.
прим. переводчика На самом деле, пример не совсем корректный; согласно документации, LongAccumulator не гарантирует порядка выполнения операций. Корректной формулой была бы, например x+2*y, т.к. при любом порядке выполнения в конце будет получаться одно и то же значение.
ConcurrentMap
Интерфейс ConcurrentMap
наследуется от обычного Map
и предоставляет описание одной из самой полезной коллекции для конкурентного использования. Чтобы продемонстрировать новые методы интерфейса, мы будем использовать вот эту заготовку:
Метод forEach()
принимает лямбду типа BiConsumer
. Этой лямбде будут передаваться в качестве аргументов все ключи и значения таблицы по очереди. Этот метод может использоваться как замена for-each циклам с итерацией по всем Entry
. Итерация выполняется последовательно, в текущем потоке. эту запятую не надо убирать. если вы считаете, что надо - пожалуйста, сначала скажите мне
Метод putIfAbsent()
помещает в таблицу значение, только если по данному ключу ещё нет другого значения. Этот метод является потокобезопасным (о крайней мере, в реализации ConcurrentHashMap
), поэтому вам не нужно использовать synchronized
, когда вы хотите использовать его в нескольких потоках (то же самое справедливо и для обычного put()
):
Метод getOrDefault()
работает так же, как и обычный get()
, с той лишь разницей, что при отсутствии значения по данному ключу он вернёт значение по-умолчанию, передаваемое вторым аргументом:
Метод replaceAll()
принимает в качестве аргумента лямбда-выражение типа BiFunction
. Этой лямбде по очереди передаются все комбинации ключ-значения из карты, а результат, который она возвращает, записывается соответствующему ключу в качестве значения:
Если же вам нужно изменить таким же образом только один ключ, это позволяет сделать метод compute()
:
Кроме обычного compute()
, существуют так же методы computeIfAbsent()
и computeIfPresent()
. Они изменяют значение только если значение по данному ключу отсутствует (или присутствует, соответственно).
И, наконец, метод merge()
, который может быть использован для объединения существующего ключа с новым значением. В качестве аргумента он принимает ключ, новое значение и лямбду, которая определяет, как новое значение должно быть объединено со старым:
ConcurrentHashMap
Кроме методов, которые описаны в ConcurrencyMap
, в ConcurrentHashMap
было добавлено и ещё несколько своих. Так же, как и параллельные stream’ы, эти методы используют специальный ForkJoinPool
, доступный через ForkJoinPool.commonPool()
в Java 8. Этот пул использует свои настройки для количества потоков, основанные на количестве ядер. У меня их 4, а значит использоваться будет три потока:
Это значение может быть специально изменено с помощью параметра JVM:
Мы рассмотрим три новых метода: forEach
, search
and reduce
. У каждого из них есть первый аргумент, который называется parallelismThreshold
, который определяет минимальное количество элементов в коллекции, при котором операция будет выполняться в нескольких потоках. Т.е. если в коллекции 499 элементов, а первый параметр выставлен равным пятистам, то операция будет выполняться в одном потоке последовательно. В наших примерах мы будем использовать первый параметр равным в единице, чтобы операции всегда выполнялись параллельно.
Для примеров ниже мы будем использовать всё ту же таблицу, что и выше (однако объявим её именем класса, а не интерфейса. чтобы нам были доступны все методы):
ForEach
Работает метод так же, как и в ConcurrentMap
. Для иллюстрации многопоточности мы будем выводить названия потоков (не забывайте, что их количество для меня ограничено тремя):
Search
Метод search()
принимает лямбда-выражение типа BiFunction
, в которую передаются все пары ключ-значение по очереди. Функция должна возвращать null
, если необходимое вхождение найдено. В случае, если функция вернёт не null
, дальнейший поиск будет остановлен. Не забывайте, что данные в хэш-таблице хранятся неупорядоченно. Если вы будете полагаться на порядок, в котором вы добавляли данные в неё, вы можете не получить ожидаемого результата. Если условиям поиска удовлетворяют несколько вхождений, результат точно предсказать нельзя.
Или вот другой пример, который полагается только на значения:
Reduce
Метод reduce()
вы могли уже встречать в Java 8 Streams. Он принимает две лямбды типа BiFunction
. Первая функция преобразовывает пару ключ/значение в один объект (любого типа). Вторая функция совмещает все полученные значения в единый результат, игнорируя любые возможные null
-значения.
На этом всё. Надеюсь, мои статьи были вам полезны ?
88К открытий90К показов