Callable и Future

Callable<V> — функциональный интерфейс из java.util.concurrent, очень похож на Runnable, но его метод call() возвращает значение типа V и может бросать проверяемые исключения. Future<V> — это «обещание» результата: объект, через который можно дождаться завершения задачи, получить значение или отменить её.

Когда мы запускаем задачу в ExecutorService через submit(callable), вызов сразу возвращает Future. Если потребитель попытается прочитать результат до завершения задачи — он будет ждать. Этот шаблон удобен, когда в AlashEd-проекте нужно одновременно прочитать значения с трёх датчиков и затем сложить их вместе.

Синтаксис

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

ExecutorService pool = Executors.newFixedThreadPool(2);
Callable<Integer> task = () -> 42;
Future<Integer> f = pool.submit(task);
int result = f.get();   // блокирует до готовности

Пример 1. Простой Callable

import java.util.concurrent.*;

public class SimpleCallable {
    public static void main(String[] args) throws Exception {
        ExecutorService pool = Executors.newSingleThreadExecutor();
        Callable<Integer> task = () -> {
            Thread.sleep(200);
            return 7 * 6;
        };

        Future<Integer> f = pool.submit(task);
        System.out.println("ждём результат...");
        System.out.println("ответ: " + f.get());

        pool.shutdown();
    }
}

Output:

ждём результат...
ответ: 42

Пример 2. Несколько задач — invokeAll

import java.util.*;
import java.util.concurrent.*;

public class InvokeAllDemo {
    public static void main(String[] args) throws Exception {
        ExecutorService pool = Executors.newFixedThreadPool(3);

        List<Callable<Integer>> tasks = new ArrayList<>();
        for (int i = 1; i <= 3; i++) {
            final int n = i;
            tasks.add(() -> {
                Thread.sleep(100);
                return n * n;
            });
        }

        List<Future<Integer>> results = pool.invokeAll(tasks);
        for (Future<Integer> f : results) {
            System.out.println("результат: " + f.get());
        }
        pool.shutdown();
    }
}

Output:

результат: 1
результат: 4
результат: 9

Пример 3. Таймаут на ожидание

import java.util.concurrent.*;

public class FutureTimeout {
    public static void main(String[] args) {
        ExecutorService pool = Executors.newSingleThreadExecutor();
        Future<String> f = pool.submit(() -> {
            Thread.sleep(1000);
            return "готово";
        });

        try {
            System.out.println(f.get(200, TimeUnit.MILLISECONDS));
        } catch (TimeoutException e) {
            System.out.println("не дождались");
            f.cancel(true);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        pool.shutdownNow();
    }
}

Output:

не дождались

Пример 4. Обработка исключения внутри задачи

import java.util.concurrent.*;

public class FutureException {
    public static void main(String[] args) {
        ExecutorService pool = Executors.newSingleThreadExecutor();
        Future<Integer> f = pool.submit(() -> {
            throw new IllegalStateException("датчик не отвечает");
        });

        try {
            f.get();
        } catch (ExecutionException e) {
            System.out.println("причина: " + e.getCause().getMessage());
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        pool.shutdown();
    }
}

Output:

причина: датчик не отвечает

Пример 5. Параллельное чтение трёх датчиков

import java.util.concurrent.*;

public class ParallelSensors {
    static int readSensor(int id) throws InterruptedException {
        Thread.sleep(200);
        return id * 10;
    }

    public static void main(String[] args) throws Exception {
        ExecutorService pool = Executors.newFixedThreadPool(3);

        Future<Integer> s1 = pool.submit(() -> readSensor(1));
        Future<Integer> s2 = pool.submit(() -> readSensor(2));
        Future<Integer> s3 = pool.submit(() -> readSensor(3));

        long t0 = System.currentTimeMillis();
        int sum = s1.get() + s2.get() + s3.get();
        long dt = System.currentTimeMillis() - t0;

        System.out.println("сумма = " + sum);
        System.out.println("заняло ~" + (dt / 100 * 100) + " мс (а не 600)");
        pool.shutdown();
    }
}

Output:

сумма = 60
заняло ~200 мс (а не 600)

Подводные камни

Предупреждение

Future.get() блокирует вызывающий поток. Если задача зависнет — зависнет и вызывающий код. Используйте перегрузку get(timeout, unit).

Предупреждение

Исключение из задачи доходит до вызывающего обёрнутое в ExecutionException. Реальную причину получите через getCause().

Совет

Для цепочки асинхронных вызовов вместо Future удобнее CompletableFuture (Java 8+) — поддерживает thenApply, thenCombine, exceptionally.

См. также

Примечание

Материал подготовлен на основе раздела «Concurrency» из Oracle Java Tutorials (docs.oracle.com/javase/tutorial/essential/concurrency/) и распространяется в соответствии с Oracle Free Documentation License. Тексты и примеры кода написаны заново на русском языке для AlashEd Wiki.