跳到主要内容

06、Java 19 新特性 - 结构化并发(孵化器)

如果一个任务由不同的子任务组成,可以并行完成(例如,从数据库访问数据、调用远程 API 和加载文件),我们可以使用 Java 多线程的一些工具类来完成。

例如:

    private final ExecutorService executor = Executors.newCachedThreadPool();

    // jdk 19 之前
    public Invoice createInvoice(int orderId, int customerId, String language) throws ExecutionException, InterruptedException {
   
     
        Future<Order> orderFuture = executor.submit(() -> loadOrderFromOrderService(orderId));

        Future<Customer> customerFuture = executor.submit(() -> loadCustomerFromDatabase(customerId));

        Future<String> invoiceTemplateFuture = executor.submit(() -> loadInvoiceTemplateFromFile(language));

        Order order = orderFuture.get();
        Customer customer = customerFuture.get();
        String invoiceTemplate = invoiceTemplateFuture.get();

        return Invoice.generate(order, customer, invoiceTemplate);
    }

但是:

如果一个子任务发生错误–我们如何取消其他子任务?

如果某个子任务不需要了,我们如何取消这个子任务呢?

这两种情况都可以,但需要相当复杂和难以维护的代码。

而如果我们想对这种类型的并发代码进行调试也非常麻烦。

[JDK Enhancement Proposal 428][]为所谓的 "结构化并发 "引入了一个 API,这个概念旨在改善这种类型需求的代码的实现、可读性和可维护性。

使用StructuredTaskScope,我们可以把这个例子改写成如下。

    public Invoice createInvoiceSinceJava19(int orderId, int customerId, String language)
            throws ExecutionException, InterruptedException {
   
     
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
   
     
            Future<Order> orderFuture =
                    scope.fork(() -> loadOrderFromOrderService(orderId));

            Future<Customer> customerFuture =
                    scope.fork(() -> loadCustomerFromDatabase(customerId));

            Future<String> invoiceTemplateFuture =
                    scope.fork(() -> loadInvoiceTemplateFromFile(language));

            scope.join();
            scope.throwIfFailed();

            Order order = orderFuture.resultNow();
            Customer customer = customerFuture.resultNow();
            String invoiceTemplate = invoiceTemplateFuture.resultNow();

            return new Invoice(order, customer, invoiceTemplate);
        }
    }

使用StructuredTaskScope,我们可以将 executor.submit() 替换为 scope.fork()

使用scope.join(),我们等待所有任务完成–或者至少有一个任务失败或被取消。在后两种情况下,随后的 throwIfFailed() 会抛出一个 ExecutionException 或一个 CancellationException

与旧方法相比,新方法带来了以下改进。

1、 任务和子任务在代码中形成一个独立的单元,每个子任务都在一个新的虚拟线程中执行;
2、 一旦其中一个子任务发生错误,所有其他子任务都会被取消;
3、 当调用线程被取消时,子任务也会被取消;

完整代码如下:

package git.snippets.jdk19;

import jdk.incubator.concurrent.StructuredTaskScope;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 * 预览功能
 * 控制台运行
 * 1. 配置Java运行环境是JDK 19
 * 2. 注释掉 package 路径
 * 3. 在本代码的目录下执行
 * 编译:javac --enable-preview -source 19 --add-modules jdk.incubator.concurrent StructuredConcurrencyTest.java
 *运行:java --enable-preview --add-modules jdk.incubator.concurrent StructuredConcurrencyTest
 */
public class StructuredConcurrencyTest {
   
     
    public static void main(String[] args) throws ExecutionException, InterruptedException {
   
     
        new StructuredConcurrencyTest().createInvoiceSinceJava19(1, 2, "ZH");
    }

    private final ExecutorService executor = Executors.newCachedThreadPool();

    // jdk 19 之前
    public Invoice createInvoice(int orderId, int customerId, String language) throws ExecutionException, InterruptedException {
   
     
        Future<Order> orderFuture = executor.submit(() -> loadOrderFromOrderService(orderId));

        Future<Customer> customerFuture = executor.submit(() -> loadCustomerFromDatabase(customerId));

        Future<String> invoiceTemplateFuture = executor.submit(() -> loadInvoiceTemplateFromFile(language));

        Order order = orderFuture.get();
        Customer customer = customerFuture.get();
        String invoiceTemplate = invoiceTemplateFuture.get();

        return Invoice.generate(order, customer, invoiceTemplate);
    }
    // jdk 19 之后
    public Invoice createInvoiceSinceJava19(int orderId, int customerId, String language)
            throws ExecutionException, InterruptedException {
   
     
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
   
     
            Future<Order> orderFuture =
                    scope.fork(() -> loadOrderFromOrderService(orderId));

            Future<Customer> customerFuture =
                    scope.fork(() -> loadCustomerFromDatabase(customerId));

            Future<String> invoiceTemplateFuture =
                    scope.fork(() -> loadInvoiceTemplateFromFile(language));

            scope.join();
            scope.throwIfFailed();

            Order order = orderFuture.resultNow();
            Customer customer = customerFuture.resultNow();
            String invoiceTemplate = invoiceTemplateFuture.resultNow();

            return new Invoice(order, customer, invoiceTemplate);
        }
    }

    private String loadInvoiceTemplateFromFile(String language) {
   
     
        return language;
    }

    private Customer loadCustomerFromDatabase(int customerId) {
   
     
        return new Customer(customerId);
    }

    private Order loadOrderFromOrderService(int orderId) {
   
     
        return new Order(orderId);
    }
}

class Invoice {
   
     
    // TODO
    public Invoice(Order order, Customer customer, String invoiceTemplate) {
   
     

    }

    public static Invoice generate(Order order, Customer customer, String invoiceTemplate) {
   
     
        return null;
    }
}

class Order {
   
     
    private int id;

    public Order(int orderId) {
   
     
        this.id = orderId;
    }
}

class Customer {
   
     
    private int id;

    public Customer(int customerId) {
   
     
        this.id = customerId;
    }
}