
在现代java应用开发中,completablefuture是处理异步操作的强大工具。然而,当面临一系列需要严格顺序执行的异步任务,并且需要将每个任务的结果收集起来时,开发者可能会遇到挑战。尤其当每个异步任务本身就返回一个completionstage时,如何正确地链式调用并避免不必要的线程阻塞或并发问题,是实现高效异步流程的关键。
假设我们有一个耗时业务处理函数 process(int a),它返回一个 CompletionStage<Integer>:
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class CompletableFutureSequential {
private CompletionStage<Integer> process(int a) {
return CompletableFuture.supplyAsync(() -> {
System.err.printf("%s dispatch %d\n", LocalDateTime.now(), a);
// 模拟长时间运行的业务过程
try {
Thread.sleep(10); // 增加延迟以观察顺序性
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return a + 10;
}).whenCompleteAsync((e, t) -> {
if (t != null)
System.err.printf("!!! error processing '%d' !!!\n", a);
System.err.printf("%s finish %d\n", LocalDateTime.now(), e);
});
}
// ... (后续解决方案代码将放在这里)
}我们的目标是按顺序执行一系列 process 调用,并将它们的整数结果收集到一个 List<Integer> 中。
常见误区与问题:
在 thenApplyAsync 内部使用 join():
立即学习“Java免费学习笔记(深入)”;
// 第一次尝试(成功但效率低下)
List<Integer> arr = IntStream.range(1, 10).boxed().collect(Collectors.toList());
CompletionStage<List<Integer>> resultStage1 = CompletableFuture.completedFuture(new ArrayList<>());
for (Integer element : arr) {
resultStage1 = resultStage1.thenApplyAsync((ret) -> {
// 在异步回调中阻塞等待另一个CompletableFuture完成
Integer a = process(element).toCompletableFuture().join();
ret.add(a);
return ret;
});
}
List<Integer> computeResult1 = resultStage1.toCompletableFuture().join();
// 这种方法虽然能实现顺序执行,但 `join()` 的使用意味着在 `thenApplyAsync` 的执行线程中会发生阻塞,
// 导致一个阶段的执行可能占用两个线程资源(一个用于 `thenApplyAsync`,另一个用于 `process` 内部的 `supplyAsync`,
// 且 `thenApplyAsync` 的线程会等待 `process` 完成),效率不高且不符合异步编程的最佳实践。这种方式虽然实现了顺序性,但 join() 是一个阻塞操作。在 thenApplyAsync 的回调中调用 join() 会导致该回调所在的线程被阻塞,直到 process(element) 完成。这违背了异步编程的非阻塞原则,并且可能导致线程池资源被低效利用。
使用 thenCombineAsync 进行链式调用:
// 第二次尝试(失败,因为是并行执行)
List<Integer> arr = IntStream.range(1, 10).boxed().collect(Collectors.toList());
CompletionStage<List<Integer>> resultStage2 = CompletableFuture.completedFuture(new ArrayList<>());
for (Integer element : arr) {
// thenCombineAsync 会尝试并行执行两个CompletionStage
resultStage2 = resultStage2.thenCombineAsync(process(element), (array, ret) -> {
array.add(ret);
return array;
});
}
// resultStage2.toCompletableFuture().join();
// 这种方法会导致 `process(element)` 几乎同时被调度执行,
// 因为 `thenCombineAsync` 的设计目的是在两个 CompletionStage 都完成后,将它们的结果合并。
// 这与我们要求的“顺序执行”相悖。thenCombineAsync 的作用是等待两个独立的 CompletionStage 都完成后,再将它们的结果合并。这意味着 process(element) 会在循环迭代时被立即触发,而不是等待前一个 process 完成。因此,它无法保证任务的顺序执行。
thenCompose 是 CompletionStage 中用于顺序执行异步操作的关键方法。它接收一个函数,该函数会返回一个新的 CompletionStage。当当前的 CompletionStage 完成后,thenCompose 会使用其结果来触发并等待这个新的 CompletionStage 完成,从而有效地“扁平化”了嵌套的 CompletionStage。
这种方法通过一个外部的 List 来收集结果。我们初始化一个表示“前一个阶段已完成”的 CompletionStage<Void>,然后循环地将新的 process 任务链接到它后面。
public class CompletableFutureSequential {
// ... (process 方法同上)
public static void main(String[] args) {
CompletableFutureSequential app = new CompletableFutureSequential();
List<Integer> arr = IntStream.range(1, 10).boxed().collect(Collectors.toList());
System.out.println("--- 方案一:使用外部列表累积结果 ---");
CompletionStage<Void> loopStage = CompletableFuture.completedFuture(null);
final List<Integer> resultList = new ArrayList<>(); // 外部列表
for (Integer element : arr) {
loopStage = loopStage
// 当 loopStage 完成后,执行 process(element)
.thenCompose(v -> app.process(element))
// 当 process(element) 完成后,将其结果添加到 resultList
.thenAccept(resultList::add);
}
// 阻塞等待所有任务完成
loopStage.toCompletableFuture().join();
System.out.println("方案一结果: " + resultList);
// 预期输出:[11, 12, 13, 14, 15, 16, 17, 18, 19]
}
}原理分析:
这种方法更加函数式,它将结果列表作为 CompletionStage 的结果在链中传递和更新。
public class CompletableFutureSequential {
// ... (process 方法同上)
public static void main(String[] args) {
// ... (方案一代码,省略以聚焦方案二)
System.out.println("\n--- 方案二:在 CompletionStage 链中传递列表 ---");
List<Integer> arr = IntStream.range(1, 10).boxed().collect(Collectors.toList());
CompletionStage<List<Integer>> listStage = CompletableFuture.completedFuture(new ArrayList<>()); // 初始列表作为结果
for (Integer element : arr) {
listStage = listStage
// 当 listStage (包含当前列表) 完成后,执行 process(element)
.thenCompose(list -> app.process(element)
// 当 process(element) 完成后,将结果添加到传入的 list
.thenAccept(list::add)
// 关键:将更新后的 list 作为下一个 CompletionStage 的结果返回
.thenApply(v -> list)
);
}
List<Integer> resultList2 = listStage.toCompletableFuture().join();
System.out.println("方案二结果: " + resultList2);
// 预期输出:[11, 12, 13, 14, 15, 16, 17, 18, 19]
}
}原理分析:
线程管理:
错误处理:
阻塞操作:join() 与 get():
选择方案:
通过理解和应用 thenCompose,开发者可以有效地构建复杂、顺序执行的异步任务流,同时保持代码的清晰性和响应性。
以上就是Java CompletableFuture 链式顺序执行与结果列表收集教程的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号