Параллельные коллекции

Стандартные коллекции (ArrayList, HashMap, LinkedList) не безопасны при одновременном изменении из нескольких потоков — это может привести к порче данных или ConcurrentModificationException. Пакет java.util.concurrent содержит специализированные коллекции, рассчитанные на многопоточный доступ: ConcurrentHashMap, CopyOnWriteArrayList, ArrayBlockingQueue, LinkedBlockingQueue, ConcurrentLinkedQueue и другие.

Принципы у них разные. Одни (ConcurrentHashMap) используют тонкую блокировку сегментов, чтобы много потоков могли работать параллельно. Другие (CopyOnWriteArrayList) при изменении создают новую копию массива — это медленные записи, но мгновенное чтение без блокировок. BlockingQueue блокирует поток, если очередь пуста или переполнена, что идеально подходит для шаблона producer-consumer (например, поток-датчик кладёт измерения, поток-обработчик их забирает).

Сводная таблица

Коллекция                | Назначение               | Особенность
-------------------------+--------------------------+-----------------------------
ConcurrentHashMap        | потокобезопасный Map     | блокировка по бакетам
CopyOnWriteArrayList     | редкие записи, частые    | копия массива при записи
                         | чтения                   |
ArrayBlockingQueue       | очередь фикс. размера    | put/take блокируют поток
LinkedBlockingQueue      | очередь произв. размера  | put/take блокируют поток
ConcurrentLinkedQueue    | unbounded, неблокир.     | lock-free CAS

Пример 1. ConcurrentHashMap

import java.util.concurrent.ConcurrentHashMap;

public class HashMapDemo {
    public static void main(String[] args) throws InterruptedException {
        ConcurrentHashMap<String, Integer> stats = new ConcurrentHashMap<>();

        Runnable task = () -> {
            for (int i = 0; i < 1000; i++) {
                stats.merge("hits", 1, Integer::sum);
            }
        };

        Thread t1 = new Thread(task);
        Thread t2 = new Thread(task);
        t1.start(); t2.start();
        t1.join(); t2.join();

        System.out.println("hits = " + stats.get("hits"));
    }
}

Output:

hits = 2000

Пример 2. ArrayBlockingQueue — producer/consumer

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockingQueueDemo {
    public static void main(String[] args) {
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(3);

        Thread producer = new Thread(() -> {
            try {
                for (int i = 1; i <= 5; i++) {
                    queue.put(i);
                    System.out.println("put " + i);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        Thread consumer = new Thread(() -> {
            try {
                for (int i = 0; i < 5; i++) {
                    Thread.sleep(100);
                    System.out.println("take " + queue.take());
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        producer.start();
        consumer.start();
    }
}

Output (порядок может слегка отличаться):

put 1
put 2
put 3
take 1
put 4
take 2
put 5
take 3
take 4
take 5

Пример 3. CopyOnWriteArrayList

import java.util.concurrent.CopyOnWriteArrayList;

public class CowDemo {
    public static void main(String[] args) {
        CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
        list.add("first");
        list.add("second");

        // Итерация безопасна даже при параллельной модификации:
        new Thread(() -> list.add("third")).start();

        for (String s : list) {
            System.out.println(s);
        }
    }
}

Output (третий элемент может появиться в этой итерации или нет, но ConcurrentModificationException не возникнет):

first
second

Пример 4. Обмен данными между потоками-датчиками

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class SensorPipeline {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<Double> readings = new LinkedBlockingQueue<>();

        Thread sensor = new Thread(() -> {
            for (int i = 1; i <= 3; i++) {
                double v = 20.0 + i * 0.5;
                readings.offer(v);
                System.out.println("датчик отправил " + v);
                try { Thread.sleep(100); } catch (InterruptedException ignored) {}
            }
        });

        Thread processor = new Thread(() -> {
            try {
                for (int i = 0; i < 3; i++) {
                    Double v = readings.take();
                    System.out.println("обработал " + v);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        sensor.start();
        processor.start();
        sensor.join();
        processor.join();
    }
}

Output:

датчик отправил 20.5
обработал 20.5
датчик отправил 21.0
обработал 21.0
датчик отправил 21.5
обработал 21.5

Пример 5. computeIfAbsent для безопасного кэша

import java.util.concurrent.ConcurrentHashMap;

public class CacheDemo {
    private static final ConcurrentHashMap<Integer, String> cache = new ConcurrentHashMap<>();

    static String fetch(int id) {
        return cache.computeIfAbsent(id, key -> {
            System.out.println("вычисляю значение для " + key);
            return "value-" + key;
        });
    }

    public static void main(String[] args) {
        System.out.println(fetch(1));
        System.out.println(fetch(1));   // из кэша
        System.out.println(fetch(2));
    }
}

Output:

вычисляю значение для 1
value-1
value-1
вычисляю значение для 2
value-2

Подводные камни

Предупреждение

Collections.synchronizedMap(new HashMap<>()) — старое решение с одной глобальной блокировкой. Для многопоточного доступа предпочитайте ConcurrentHashMap — он масштабируется лучше.

Предупреждение

CopyOnWriteArrayList дёшев для чтений, но каждое add() создаёт копию массива. Не используйте его там, где много записей.

Совет

BlockingQueue — идеальный «буфер» между потоком, читающим датчик, и потоком, обрабатывающим данные: автоматически ждёт, когда есть/нет элементов, и не требует ручной синхронизации.

См. также

Примечание

Материал подготовлен на основе раздела «Concurrency» из Oracle Java Tutorials (docs.oracle.com/javase/tutorial/essential/concurrency/) и распространяется в соответствии с Oracle Free Documentation License. Тексты и примеры кода написаны заново на русском языке для AlashEd Wiki.