
在异步编程中,CompletableFuture是处理并发任务的强大工具。然而,当面临需要严格顺序执行的异步任务链,并且需要收集每个任务的结果时,可能会遇到一些挑战。例如,业务场景可能要求前一个任务完成后,后一个任务才能开始,同时我们希望将所有任务的计算结果汇总到一个集合中。
考虑一个耗时的业务处理函数,它返回一个CompletionStage<Integer>:
import java.time.LocalDateTime;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
public class SequentialTaskProcessor {
private CompletionStage<Integer> process(int a) {
return CompletableFuture.supplyAsync(() -> {
System.err.printf("%s dispatch %d\n", LocalDateTime.now(), a);
// 模拟长时间运行的业务处理
try {
Thread.sleep(10); // 增加延迟以观察效果
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return a + 10;
}).whenCompleteAsync((e, t) -> {
if (t != null)
System.err.printf("!!! error processing '%d' !!!\n", a);
System.err.printf("%s finish %d\n", LocalDateTime.now(), e);
});
}我们的目标是多次调用process函数,确保它们按顺序执行,并将每次的结果收集到一个List<Integer>中。
一种直观的尝试是使用thenApplyAsync并在其内部调用process(element).toCompletableFuture().join()。
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
// ... (process方法同上)
public void firstApproach() {
List<Integer> arr = IntStream.range(1, 10).boxed().collect(Collectors.toList());
CompletionStage<List<Integer>> result = CompletableFuture.completedFuture(new ArrayList<>());
for (Integer element : arr) {
result = result.thenApplyAsync((ret) -> {
// 在thenApplyAsync内部阻塞等待前一个CompletableFuture完成
Integer a = process(element).toCompletableFuture().join();
ret.add(a);
return ret;
});
}
List<Integer> computeResult = result.toCompletableFuture().join();
System.out.println("First approach results: " + computeResult);
}问题分析: 虽然这种方法能够实现顺序执行并收集结果,但它效率低下。thenApplyAsync本身会在一个线程池中执行其回调,而回调内部的process(element).toCompletableFuture().join()又会阻塞这个线程,直到process方法返回的CompletableFuture完成。这意味着一个逻辑步骤可能间接占用两个线程资源(一个用于thenApplyAsync的回调,另一个用于process内部的异步任务),造成线程资源的浪费和不必要的阻塞。观察输出日志,会发现dispatch和finish的时间戳是严格顺序的,但线程利用率不高。
另一种尝试是使用thenCombineAsync,期望它能将前一个阶段的结果与新任务的结果结合:
// ... (process方法同上)
public void secondApproach() {
List<Integer> arr = IntStream.range(1, 10).boxed().collect(Collectors.toList());
CompletionStage<List<Integer>> result = CompletableFuture.completedFuture(new ArrayList<>());
for (Integer element : arr) {
// process(element) 在这里被立即调用,而非等待前一个阶段完成
result = result.thenCombineAsync(process(element), (array, ret) -> {
array.add(ret);
return array;
});
}
List<Integer> computeResult = result.toCompletableFuture().join();
System.out.println("Second approach results: " + computeResult);
}问题分析: 这种方法会导致任务并发执行,而非顺序执行。thenCombineAsync的第二个参数CompletionStage<U> other在方法调用时就会被评估并启动。这意味着在循环中,所有的process(element)调用几乎是同时发起的,它们会并发执行。观察输出日志,会发现dispatch的时间戳是交错的,这违反了顺序执行的要求。thenCombineAsync适用于两个独立的异步任务都完成后再进行合并的场景,而不是链式顺序执行的场景。
为了实现任务的顺序执行并高效地收集结果,我们需要利用CompletableFuture提供的更高级的组合方法,特别是thenCompose。
这种方法通过thenCompose确保任务顺序执行,并使用thenAccept将结果添加到循环外部维护的列表中。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
// ... (process方法同上)
public class SequentialTaskProcessor {
// ... process 方法 ...
public void solutionOne() {
List<Integer> arr = IntStream.range(1, 10).boxed().collect(Collectors.toList());
// 初始化一个表示链式操作开始的CompletableFuture,其结果类型为Void
CompletionStage<Void> loopStage = CompletableFuture.completedFuture(null);
final List<Integer> resultList = new ArrayList<>(); // 外部结果列表
for (Integer element : arr) {
loopStage = loopStage
// thenCompose确保前一个阶段完成后,才执行process(element)
.thenCompose(v -> process(element))
// thenAccept将process的结果添加到外部列表中,并返回CompletionStage<Void>
.thenAccept(resultList::add);
}
// 阻塞等待所有任务完成
loopStage.toCompletableFuture().join();
System.out.println("Solution One results: " + resultList);
}
public static void main(String[] args) {
SequentialTaskProcessor processor = new SequentialTaskProcessor();
System.out.println("--- Running Solution One ---");
processor.solutionOne();
System.out.println("\n--- Running Solution Two ---");
processor.solutionTwo();
}
}原理详解:
这种方法简洁且高效,避免了不必要的阻塞和线程浪费。
另一种方法是在CompletableFuture链中直接传递并累积结果列表。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
// ... (process方法同上)
public class SequentialTaskProcessor {
// ... process 方法 ...
public void solutionTwo() {
List<Integer> arr = IntStream.range(1, 10).boxed().collect(Collectors.toList());
// 初始化一个携带空列表的CompletableFuture
CompletionStage<List<Integer>> listStage = CompletableFuture.completedFuture(new ArrayList<>());
for (Integer element : arr) {
listStage = listStage
// thenCompose确保前一个阶段完成后,才执行process(element)
.thenCompose(list -> process(element)
// thenAccept将process的结果添加到当前列表
.thenAccept(list::add)
// thenApply将CompletionStage<Void>转换回CompletionStage<List<Integer>>
.thenApply(v -> list)
);
}
// 阻塞等待所有任务完成,并获取最终的列表
List<Integer> resultList = listStage.toCompletableFuture().join();
System.out.println("Solution Two results: " + resultList);
}
// ... main 方法 ...
}原理详解:
两种解决方案都能够有效地实现异步任务的顺序执行和结果收集,并且都避免了线程阻塞和并发执行的问题。
注意事项:
通过理解thenCompose的扁平化特性和thenAccept/thenApply的组合使用,我们可以更灵活、高效地构建复杂的异步任务流,满足各种顺序执行和结果收集的需求。
以上就是深入理解CompletableFuture:实现任务的顺序执行与结果收集的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号