ExecutorService

Создавать Thread вручную для каждой задачи — неэффективно и сложно управлять. Пакет java.util.concurrent даёт ExecutorService — высокоуровневую абстракцию: вы отдаёте задачи (Runnable или Callable), а пул сам распределяет их по своим рабочим потокам.

В AlashEd-проектах это полезно, например, когда нужно одновременно опросить несколько датчиков по сети, отправить результаты в облако и обработать команды от пользователя — пул в 4-8 потоков справится со всеми задачами без постоянного пересоздания Thread-объектов.

Фабричные методы Executors создают типовые пулы: newFixedThreadPool, newCachedThreadPool, newSingleThreadExecutor, newScheduledThreadPool.

Синтаксис

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

ExecutorService pool = Executors.newFixedThreadPool(4);
pool.submit(() -> { /* задача */ });
pool.shutdown();   // не принимать новые задачи; ждать завершения текущих

Пример 1. Fixed Thread Pool

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class FixedPoolDemo {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService pool = Executors.newFixedThreadPool(2);

        for (int i = 1; i <= 4; i++) {
            final int id = i;
            pool.submit(() -> {
                System.out.println("task " + id + " на " + Thread.currentThread().getName());
                try { Thread.sleep(100); } catch (InterruptedException ignored) {}
            });
        }

        pool.shutdown();
        pool.awaitTermination(5, TimeUnit.SECONDS);
        System.out.println("все задачи завершены");
    }
}

Output:

task 1 на pool-1-thread-1
task 2 на pool-1-thread-2
task 3 на pool-1-thread-1
task 4 на pool-1-thread-2
все задачи завершены

Пример 2. Single Thread Executor (последовательная очередь)

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class SingleExecutorDemo {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService logger = Executors.newSingleThreadExecutor();
        for (int i = 1; i <= 3; i++) {
            final int n = i;
            logger.submit(() -> System.out.println("log " + n));
        }
        logger.shutdown();
        logger.awaitTermination(2, TimeUnit.SECONDS);
    }
}

Output:

log 1
log 2
log 3

Пример 3. ScheduledExecutorService — периодический опрос

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheduledDemo {
    public static void main(String[] args) throws InterruptedException {
        ScheduledExecutorService sched = Executors.newScheduledThreadPool(1);

        sched.scheduleAtFixedRate(
                () -> System.out.println("опрос датчика " + System.currentTimeMillis() % 10000),
                0, 200, TimeUnit.MILLISECONDS);

        Thread.sleep(700);
        sched.shutdownNow();
    }
}

Output (значения tick’ов меняются):

опрос датчика 4123
опрос датчика 4323
опрос датчика 4523
опрос датчика 4723

Пример 4. CachedThreadPool

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class CachedPoolDemo {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService pool = Executors.newCachedThreadPool();
        for (int i = 0; i < 3; i++) {
            final int id = i;
            pool.submit(() -> System.out.println("task " + id
                    + " -> " + Thread.currentThread().getName()));
        }
        pool.shutdown();
        pool.awaitTermination(2, TimeUnit.SECONDS);
    }
}

Output:

task 0 -> pool-1-thread-1
task 1 -> pool-1-thread-2
task 2 -> pool-1-thread-3

Пример 5. Graceful shutdown

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class GracefulShutdown {
    public static void main(String[] args) {
        ExecutorService pool = Executors.newFixedThreadPool(2);
        for (int i = 0; i < 3; i++) {
            pool.submit(() -> {
                try { Thread.sleep(200); } catch (InterruptedException ignored) {}
                System.out.println("done " + Thread.currentThread().getName());
            });
        }

        pool.shutdown();
        try {
            if (!pool.awaitTermination(1, TimeUnit.SECONDS)) {
                pool.shutdownNow();
            }
        } catch (InterruptedException e) {
            pool.shutdownNow();
            Thread.currentThread().interrupt();
        }
        System.out.println("pool остановлен");
    }
}

Output:

done pool-1-thread-1
done pool-1-thread-2
done pool-1-thread-1
pool остановлен

Подводные камни

Предупреждение

Без shutdown() приложение не завершится — рабочие потоки пула не являются демонами и удерживают JVM.

Предупреждение

newCachedThreadPool() создаёт неограниченное число потоков. При потоке задач это может исчерпать память. В production обычно используют new ThreadPoolExecutor(...) с явными границами.

Совет

Не глотайте исключения внутри задач: оборачивайте код в try/catch и логируйте, иначе ошибки в Runnable останутся незамеченными.

См. также

Примечание

Материал подготовлен на основе раздела «Concurrency» из Oracle Java Tutorials (docs.oracle.com/javase/tutorial/essential/concurrency/) и распространяется в соответствии с Oracle Free Documentation License. Тексты и примеры кода написаны заново на русском языке для AlashEd Wiki.