跳到主要内容

14、Java 21 新特性 - 结构化并发(预览)

通过引入用于结构化并发(Structured Concurrency)的API来简化并发编程。结构化并发将在不同线程中运行的相关任务组视为单个工作单元,从而简化错误处理和消除,提高可靠性,并增强可观察性。这是一个预览API。

结构化并发是一种旨在通过提供结构化且易于遵循的方法来简化并发编程的编程范例。使用结构化并发,开发人员可以创建更容易理解和调试、不容易出现竞态条件和其他与并发相关的错误的并发代码。在结构化并发中,所有并发代码都被结构化为称为任务的明确定义的工作单元。任务以结构化的方式创建、执行和完成,任务的执行始终保证在其父任务完成之前完成。

结构化并发可以使多线程编程更加简单和可靠。在传统的多线程编程中,线程的启动、执行和终止都是由开发人员手动管理的,因此容易出现线程泄漏、死锁和不正确的异常处理等问题。

使用结构化并发,开发人员可以更自然地组织并发任务,使任务之间的依赖关系更清晰,代码逻辑更简洁。结构化并发还提供了一些异常处理机制,以更好地管理并发任务中的异常,避免由异常引起的程序崩溃或数据不一致。

此外,结构化并发还可以通过限制并发任务的数量和优先级来防止资源

竞争和饥饿现象。这些特性使得开发人员能够更容易地实现高效且可靠的并发程序,而不必过多关注底层线程管理。

结构化并发

想象以下情景。假设您有三个任务需要同时执行。只要任何一个任务完成并返回结果,就可以直接使用该结果,可以停止其他两个任务。例如,一个天气服务通过三个渠道获取天气情况,只要一个渠道返回即可。

在这种情况下,在Java 8下应该做什么,当然也是可以的。

List<Future<?>> futures = executor.invokeAll(tasks);
String result = executor.invokeAny(tasks);

使用ExecutorService的invokeAll和invokeAny方法实现,但会有一些额外的工作。在获取第一个结果后,您需要手动关闭另一个线程。

在JDK 21中,可以使用结构化编程来实现。

ShutdownOnSuccess捕获第一个结果并关闭任务范围以中断未完成的线程并唤醒调用线程。

一种情况是任何子任务的结果都可以直接使用,而无需等待其他未完成任务的结果。

它定义了获取第一个结果或在所有子任务失败时抛出异常的方法。

public static void main(String[] args) throws IOException {
    try (var scope = new StructuredTaskScope.ShutdownOnSuccess()) {
        Future<String> res1 = scope.fork(() -> runTask(1));
        Future<String> res2 = scope.fork(() -> runTask(2));
        Future<String> res3 = scope.fork(() -> runTask(3));
        scope.join();
        System.out.println("scope:" + scope.result());
    } catch (ExecutionException | InterruptedException e) {
        throw new RuntimeException(e);
    }
}
 
 
public static String runTask(int i) throws InterruptedException {
    Thread.sleep(
 
 
1000);
    long l = new Random().nextLong();
    String s = String.valueOf(l);
    System.out.println(i + "task:" + s);
    return s;
}

ShutdownOnFailure

执行多个任务,只要有一个失败(发生异常或引发其他活动异常),就停止其他未完成的任务,并使用scope.throwIfFailed来捕获并抛出异常。

如果所有任务都正常,可以使用Feture.get()或*Feture.resultNow()来获取结果。

public static void main(String[] args) throws IOException {
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
        Future<String> res1 = scope.fork(() -> runTaskWithException(1));
        Future<String> res2 = scope.fork(() -> runTaskWithException(2));
        Future<String> res3 = scope.fork(() -> runTaskWithException(3));
        scope.join();
        scope.throwIfFailed(Exception::new);
 
 
        String s = res1.resultNow();
        System.out.println(s);
 
 
        String result = Stream.of(res1, res2, res3)
                             .map(Future::resultNow)
                             .collect(Collectors.joining());
        System.out.println("result:" + result);
    } catch (Exception e) {
        e.printStackTrace();
    }
}
 
 
public static String runTaskWithException(int i) throws InterruptedException {
    Thread.sleep(1000);
    long l = new Random().nextLong(3);
    if (l == 0) {
        throw new InterruptedException();
    }
    String s = String.valueOf(l);
    System.out.println(i + "task:" + s);
    return s;
}