В многопоточном программировании, в том числе и на языке Java, одной из самых частых проблем является "состояние гонки" (race condition), когда несколько потоков пытаются одновременно читать и изменять одни и те же данные. Рассмотрим классический пример - простой счетчик:
public class Program {
public static void main(String[] args) throws InterruptedException{
int numThreads = 1000; // количество потоков
int incrementsPerThread = 1000; // количество итреаций в каждом потоке
Counter counter = new Counter();
Thread[] threads = new Thread[numThreads];
for (int i = 0; i < numThreads; i++){
threads[i] = new Thread(() ->{
for (int j = 0; j < incrementsPerThread; j++) {
counter.increment();
}
});
threads[i].start();
}
// небольшая задержка в 1 секунду, чтобы потоки успели поработать
Thread.sleep(1000);
// ожидаем завершения всех потоков
for (int i = 0; i < numThreads; i++){
threads[i].join();
}
// проверяем значение счетчика
System.out.println("Counter: " + counter.getCounter());
}
}
class Counter{
private volatile long counter = 0;
long getCounter() { return counter; }
void increment() { counter++; } // НЕ АТОМАРНАЯ ОПЕРАЦИЯ!
}
Итак, здесь у нас есть класс Counter - условного счетчика, где есть переменная counter
private volatile long counter = 0;
Чтобы все потоки вдели изменения этой переменной, она определена с ключевым словом volatile. А в методе increment() увеличиваем на 1 значение этой переменной:
void increment() { counter++; }
В методе main() используем этот класс, создая его объект:
Counter counter = new Counter();
Далее создаем массив из 1000 потоков, которые 1000 раз выполняют метод increment() у объекта Counter:
Thread[] threads = new Thread[numThreads];
for (int i = 0; i < numThreads; i++){
threads[i] = new Thread(() ->{
for (int j = 0; j < incrementsPerThread; j++) {
counter.increment();
}
});
threads[i].start();
}
После завершения потоков выводим финальное значение счетчика:
System.out.println("Counter: " + counter.getCounter());
Если мы запускаем 1000 потоков, каждый из которых будет 1000 раз инкрементировать счетчик на единицу, то было бы разумно ожидать, что финальное значение счетчика будет равно 1000 * 1000 = 1000 000 (то есть 1 миллион). Но посмотрим, какой на самом деле будет результат. Он может быть таким:
Counter: 944695
А может быть и таким
Counter: 870973
А может быть и совсем другим. Результат в данном случае недетерминарован, но он может (и скорее всего будет) отличаться от ожидаемых 1000 000. Почему так происходит? Дело в том, что операция инкремента,
которую выполняет в примере выше
метод increment(), на самом деле она состоит из трех шагов:
Чтение текущего значения поля counter
Увеличение прочитанного значения на 1
Запись нового значения обратно в counter
И если два и более потоков выполняют инкремент одновременно, то они могут считать одно и то же значение.
Для решения этой проблемы мы могли бы использовать блоки/функции с оператором synchronized или использоваnm блокировокb ReentrantLock. Однако оба этих способа представляют
блокирующий механизм. И если поток А держит блокировку, поток Б (и В, Г, Д...) вынуждены остановиться и ждать. В системах с высокой нагрузкой
это приводит к простоям потоков, частым переключениям контекста и резкому падению производительности.
Для решения этой задачи язык Java предоставляет более эффективный инструмент - классы из пакета java.util.concurrent.atomic, такие как AtomicInteger, AtomicBoolean, AtomicReference, AtomicLong и т.д. Эти классы используют эффективные машинные инструкции для гарантии атомарности других операций без использования блокировок. Предоставляемые этими классами операции над одиночными переменными неблокирующие и потокобезопасные (thread-safe)
Рассмотрим применение подобных классов на примере AtomicLong, который представляет обертку над значением типа long с атомарными методами для его изменения. Для создания объекта применяется один из конструкторов класса:
AtomicLong() AtomicLong(long initialValue)
Первая версия инициализирует объект числом 0. Второй конструктор принимает начальное значение для AtomicLong.
Ключевые методы AtomicLong:
long get(): возвращает текущее значение (аналогично чтению volatile long)
void set(long newValue): устанавливает новое значение (аналогично записи в volatile long)
И также класс предоставляет ряд атомарных операций, которые собственно и представляют суть класса:
long incrementAndGet(): атомарно увеличивает на 1 и возвращает новое значение (префиксный инкремент, ++i)
long getAndIncrement(): атомарно увеличивает на 1 и возвращает старое значение (постфиксный инкремент, i++)
long decrementAndGet(): атомарно уменьшает на 1 и возвращает новое (--i)
long getAndDecrement(): атомарно уменьшает на 1 и возвращает старое (i--)
long addAndGet(long delta): атомарно добавляет delta и возвращает новое (i += delta)
long getAndAdd(long delta): атомарно добавляет delta и возвращает старое
boolean compareAndSet(long expect, long update): пытается установить значение update, только если текущее значение равно expect. Возвращает true в случае успеха
Для применения атомарности и AtomicLong изменим предыдущий пример следующим образом:
import java.util.concurrent.atomic.AtomicLong;
public class Program {
public static void main(String[] args) throws InterruptedException{
int numThreads = 1000;
int incrementsPerThread = 1000;
Counter counter = new Counter();
Thread[] threads = new Thread[numThreads];
for (int i = 0; i < numThreads; i++){
threads[i] = new Thread(() ->{
for (int j = 0; j < incrementsPerThread; j++) {
counter.increment();
}
});
threads[i].start();
}
// небольшая задержка, чтобы потоки успели поработать
Thread.sleep(1000);
// ожидаем завершения всех потоков
for (int i = 0; i < numThreads; i++){
threads[i].join();
}
// проверяем значение счетчика
System.out.println("Counter: " + counter.getCounter());
}
}
class Counter{
// Правильный, потокобезопасный счетчик
private AtomicLong counter = new AtomicLong(0);
long getCounter() { return counter.get(); }
void increment() { counter.getAndIncrement(); } // АТОМАРНАЯ ОПЕРАЦИЯ!
}
Здесь изменен только код класса Counter - вместо типа long в качестве типа переменной применяется класс AtomicLong. Для создания объекта этого класса применяется конструктор,
в который передается начальное значение переменной:
AtomicLong counter = new AtomicLong(0);
То есть по умолчанию переменная counter равна 0.
В методе getCounter() с помощью метода get() возвращаем текущее значение счетчика:
long getCounter() { return counter.get(); }
А в методе increment() с помощью вызова getAndIncrement() увеличиваем значение счетчика на 1:
void increment() { counter.getAndIncrement(); }
Таким образом, при запуске потоков и выполнении метода increment:
for (int i = 0; i < numThreads; i++){
threads[i] = new Thread(() ->{
for (int j = 0; j < incrementsPerThread; j++) {
counter.increment(); // Выполняем атомарную операцию
}
});
threads[i].start();
}
будет выполняться атомарная операция, даже если несколько потоков подряд попробуют вызвать метод increment()
И теперь консольный вывод будет детерминирован и собственно тем, каким и должен быть:
Counter: 1000000
Как видно из примера, AtomicLong дает корректный результат, в то время как volatile long теряет инкременты из-за состояния гонки.
Атомарные типы, например, тот же AtomicLong, очень удобно использовать для создания и управления счетчиками производительности, для генерации идентфиикаторов.
Но следует учитывать, что при наличии очень большого количества потоков, которые обращаются к одним и тем же атомарным значениям, производительность может снижаться. Для увеличения производительности можно
использовать классы LongAdder и LongAccumulator.
Например, класс LongAdder используется для сценария исключительно подсчета, когда не нужно мгновенно знать точное значение, а важен только итоговый результат (сумма всех значений). Это эффективно в распространtнной ситуации, когда значение суммы не требуется до завершения всей работы. Повышение в этом случае производительности может быть существенным.
Под капотом LongAdder хранит не одно значение, а массив значений (ячеек), общая сумма которых является текущим значением. Каждый поток работает со своей ячейкой, и коллизии (конкуренция)
возникают гораздо реже. А при вызове метода sum() LongAdder просто складывает значения всех ячеек.
Вкратце пробежимся по функционалу класса LongAdder:
void add(long x): добавляет заданное значение.
void decrement(): эквивалентно add(-1).
double doubleValue(): возвращает sum() как значение типа double после расширяющего преобразования.
float floatValue(): Возвращает sum() как значение типа float после расширяющего преобразования.
void increment(): эквивалентно add(1).
int intValue(): возвращает sum() как значение типа int после сужающего преобразования.
long longValue(): эквивалентно sum().
void reset(): сбрасывает переменные, сохраняя сумму равной нулю.
long sum(): возвращает текущую сумму.
long sumThenReset(): эквивалентно sum(), за которым следует reset().
Так, в примере выше не требовалось получать промежуточное значение после каждого инкремента, поэтому мы можем заменить AtomicLong на LongAdder:
import java.util.concurrent.atomic.LongAdder;
public class Program {
public static void main(String[] args) throws InterruptedException{
int numThreads = 1000;
int incrementsPerThread = 1000;
Counter counter = new Counter();
Thread[] threads = new Thread[numThreads];
for (int i = 0; i < numThreads; i++){
threads[i] = new Thread(() ->{
for (int j = 0; j < incrementsPerThread; j++) {
counter.increment();
}
});
threads[i].start();
}
// небольшая задержка, чтобы потоки успели поработать
Thread.sleep(1000);
// ожидаем завершения всех потоков
for (int i = 0; i < numThreads; i++){
threads[i].join();
}
// проверяем значение счетчика
System.out.println("Counter: " + counter.getCounter());
}
}
class Counter{
private LongAdder counter = new LongAdder();
long getCounter() { return counter.longValue(); } // возвращаем сумму всех значений (инкрементов) потоков
void increment() { counter.increment(); } // увеличиваем на 1
}
Фактически здесь все то же самое, только вместо AtomicLong для определения счетчика применяется LongAdder. С помощью метода longValue() возвращаем значение счетчика (по сути сумму значений всех потоков).
А с помощью метода increment() каждый поток будет увеличивать свое значение в счетчике.
А консольный вывод также будет детерминирован, каким и должен быть:
Counter: 1000000