Параллельные коллекции
Стандартные коллекции (ArrayList, HashMap, LinkedList) не
безопасны при одновременном изменении из нескольких потоков — это может
привести к порче данных или ConcurrentModificationException. Пакет
java.util.concurrent содержит специализированные коллекции, рассчитанные
на многопоточный доступ: ConcurrentHashMap, CopyOnWriteArrayList,
ArrayBlockingQueue, LinkedBlockingQueue, ConcurrentLinkedQueue
и другие.
Принципы у них разные. Одни (ConcurrentHashMap) используют тонкую
блокировку сегментов, чтобы много потоков могли работать параллельно.
Другие (CopyOnWriteArrayList) при изменении создают новую копию массива —
это медленные записи, но мгновенное чтение без блокировок. BlockingQueue
блокирует поток, если очередь пуста или переполнена, что идеально подходит
для шаблона producer-consumer (например, поток-датчик кладёт измерения,
поток-обработчик их забирает).
Сводная таблица
Коллекция | Назначение | Особенность
-------------------------+--------------------------+-----------------------------
ConcurrentHashMap | потокобезопасный Map | блокировка по бакетам
CopyOnWriteArrayList | редкие записи, частые | копия массива при записи
| чтения |
ArrayBlockingQueue | очередь фикс. размера | put/take блокируют поток
LinkedBlockingQueue | очередь произв. размера | put/take блокируют поток
ConcurrentLinkedQueue | unbounded, неблокир. | lock-free CAS
Пример 1. ConcurrentHashMap
import java.util.concurrent.ConcurrentHashMap;
public class HashMapDemo {
public static void main(String[] args) throws InterruptedException {
ConcurrentHashMap<String, Integer> stats = new ConcurrentHashMap<>();
Runnable task = () -> {
for (int i = 0; i < 1000; i++) {
stats.merge("hits", 1, Integer::sum);
}
};
Thread t1 = new Thread(task);
Thread t2 = new Thread(task);
t1.start(); t2.start();
t1.join(); t2.join();
System.out.println("hits = " + stats.get("hits"));
}
}
Output:
hits = 2000
Пример 2. ArrayBlockingQueue — producer/consumer
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BlockingQueueDemo {
public static void main(String[] args) {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(3);
Thread producer = new Thread(() -> {
try {
for (int i = 1; i <= 5; i++) {
queue.put(i);
System.out.println("put " + i);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
Thread consumer = new Thread(() -> {
try {
for (int i = 0; i < 5; i++) {
Thread.sleep(100);
System.out.println("take " + queue.take());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
}
}
Output (порядок может слегка отличаться):
put 1
put 2
put 3
take 1
put 4
take 2
put 5
take 3
take 4
take 5
Пример 3. CopyOnWriteArrayList
import java.util.concurrent.CopyOnWriteArrayList;
public class CowDemo {
public static void main(String[] args) {
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
list.add("first");
list.add("second");
// Итерация безопасна даже при параллельной модификации:
new Thread(() -> list.add("third")).start();
for (String s : list) {
System.out.println(s);
}
}
}
Output (третий элемент может появиться в этой итерации или нет, но
ConcurrentModificationException не возникнет):
first
second
Пример 4. Обмен данными между потоками-датчиками
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class SensorPipeline {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Double> readings = new LinkedBlockingQueue<>();
Thread sensor = new Thread(() -> {
for (int i = 1; i <= 3; i++) {
double v = 20.0 + i * 0.5;
readings.offer(v);
System.out.println("датчик отправил " + v);
try { Thread.sleep(100); } catch (InterruptedException ignored) {}
}
});
Thread processor = new Thread(() -> {
try {
for (int i = 0; i < 3; i++) {
Double v = readings.take();
System.out.println("обработал " + v);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
sensor.start();
processor.start();
sensor.join();
processor.join();
}
}
Output:
датчик отправил 20.5
обработал 20.5
датчик отправил 21.0
обработал 21.0
датчик отправил 21.5
обработал 21.5
Пример 5. computeIfAbsent для безопасного кэша
import java.util.concurrent.ConcurrentHashMap;
public class CacheDemo {
private static final ConcurrentHashMap<Integer, String> cache = new ConcurrentHashMap<>();
static String fetch(int id) {
return cache.computeIfAbsent(id, key -> {
System.out.println("вычисляю значение для " + key);
return "value-" + key;
});
}
public static void main(String[] args) {
System.out.println(fetch(1));
System.out.println(fetch(1)); // из кэша
System.out.println(fetch(2));
}
}
Output:
вычисляю значение для 1
value-1
value-1
вычисляю значение для 2
value-2
Подводные камни
Предупреждение
Collections.synchronizedMap(new HashMap<>()) — старое решение с одной
глобальной блокировкой. Для многопоточного доступа предпочитайте
ConcurrentHashMap — он масштабируется лучше.
Предупреждение
CopyOnWriteArrayList дёшев для чтений, но каждое add() создаёт
копию массива. Не используйте его там, где много записей.
Совет
BlockingQueue — идеальный «буфер» между потоком, читающим датчик, и
потоком, обрабатывающим данные: автоматически ждёт, когда есть/нет
элементов, и не требует ручной синхронизации.
См. также
Примечание
Материал подготовлен на основе раздела «Concurrency» из Oracle Java Tutorials (docs.oracle.com/javase/tutorial/essential/concurrency/) и распространяется в соответствии с Oracle Free Documentation License. Тексты и примеры кода написаны заново на русском языке для AlashEd Wiki.