
在企业级应用中,从数据库批量获取数据是常见需求。然而,数据库通常对单次查询接受的参数数量有限制(例如,SQL IN 子句的参数数量)。因此,我们经常需要将一个大的键列表分割成多个小批次,然后对每个批次执行查询。
原始代码示例展示了这种场景,其中一个包含5000个数字的列表被分割成多个大小为500的子列表,然后对每个子列表执行数据库查询。
AtomicInteger counter = new AtomicInteger();
List<Cat> catList = new ArrayList<>(); // 外部可变列表
List<Dog> dogList = new ArrayList<>(); // 外部可变列表
List<Integer> numbers = Stream.iterate(1, e -> e + 1)
.limit(5000)
.collect(Collectors.toList());
Collection<List<Integer>> partitionedListOfNumbers = numbers.stream()
.collect(Collectors.groupingBy(num -> counter.getAndIncrement() / 500))
.values(); // 将列表分割成大小为500的子列表
partitionedListOfNumbers.stream()
.forEach(list -> {
List<Cat> interimCatList = catRepo.fetchCats(list); // 从数据库获取Cat
catList.addAll(interimCatList); // 修改外部 catList
List<Dog> interimDogList = dogRepo.fetchDogs(list); // 从数据库获取Dog
dogList.addAll(interimDogList); // 修改外部 dogList
});上述代码的核心问题在于其使用了forEach操作,并在其中通过catList.addAll(interimCatList)和dogList.addAll(interimDogList)直接修改了外部的catList和dogList。这种模式被称为“共享可变性”(Shared Mutability)。
共享可变性带来了多方面的问题:
立即学习“Java免费学习笔记(深入)”;
为了构建更健壮、更易于测试和并发友好的代码,我们应该尽量避免共享可变性。
Java 8 引入的 Stream API 提供了一种声明式、函数式的方式来处理集合数据。通过利用 map、flatMap 和 collect 等操作,我们可以在不修改外部状态的情况下转换和聚合数据。
为了消除共享可变性,我们将重构代码,使其不再使用forEach来修改外部列表,而是利用Stream的管道操作来生成新的结果列表。
核心思路:
以下是重构后的代码示例:
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
// 模拟数据库仓库接口和实体类
class Cat {
private int id;
private String name;
public Cat(int id, String name) { this.id = id; this.name = name; }
@Override public String toString() { return "Cat{id=" + id + ", name='" + name + "'}"; }
}
class Dog {
private int id;
private String name;
public Dog(int id, String name) { this.id = id; this.name = name; }
@Override public String toString() { return "Dog{id=" + id + ", name='" + name + "'}"; }
}
class CatRepository {
public List<Cat> fetchCats(List<Integer> keys) {
// 模拟数据库查询
System.out.println("Fetching Cats for keys: " + keys.size() + " elements, first: " + keys.get(0));
return keys.stream()
.map(id -> new Cat(id, "Cat_" + id))
.collect(Collectors.toList());
}
}
class DogRepository {
public List<Dog> fetchDogs(List<Integer> keys) {
// 模拟数据库查询
System.out.println("Fetching Dogs for keys: " + keys.size() + " elements, first: " + keys.get(0));
return keys.stream()
.map(id -> new Dog(id, "Dog_" + id))
.collect(Collectors.toList());
}
}
public class BatchProcessingRefactor {
public static void main(String[] args) {
CatRepository catRepo = new CatRepository();
DogRepository dogRepo = new DogRepository();
int totalNumbers = 5000;
int batchSize = 500;
// AtomicInteger 用于在 groupingBy 中生成分组键
// 它本身是可变的,但其作用是帮助创建不可变的子集合
AtomicInteger counter = new AtomicInteger();
// 1. 数据分批:将 1 到 5000 的数字分割成大小为 500 的子列表
// IntStream.rangeClosed(1, totalNumbers) 生成一个从1到totalNumbers的整数流
// boxed() 将 IntStream 转换为 Stream<Integer>
Collection<List<Integer>> partitionedListOfNumbers = IntStream.rangeClosed(1, totalNumbers)
.boxed()
.collect(Collectors.groupingBy(num -> counter.getAndIncrement() / batchSize))
.values();
System.out.println("Total partitions: " + partitionedListOfNumbers.size());
// 2. 处理 Cat 数据:使用 Stream API 避免共享可变性
// partitionedListOfNumbers.stream() 创建一个包含 List<Integer> 的流
// .map(catRepo::fetchCats) 将每个 List<Integer> 映射为一个 List<Cat>
// 此时流的类型是 Stream<List<Cat>>
// .flatMap(List::stream) 将 Stream<List<Cat>> 扁平化为 Stream<Cat>
// 即将所有 List<Cat> 中的 Cat 对象合并到一个单一的流中
// .collect(Collectors.toList()) 将 Stream<Cat> 中的所有 Cat 对象收集到一个新的 List<Cat> 中
List<Cat> catList = partitionedListOfNumbers.stream()
.map(catRepo::fetchCats)
.flatMap(List::stream)
.collect(Collectors.toList());
// 3. 处理 Dog 数据:同样的方式
List<Dog> dogList = partitionedListOfNumbers.stream()
.map(dogRepo::fetchDogs)
.flatMap(List::stream)
.collect(Collectors.toList());
System.out.println("Fetched " + catList.size() + " cats.");
System.out.println("Fetched " + dogList.size() + " dogs.");
// 验证部分数据
// catList.stream().limit(5).forEach(System.out::println);
// dogList.stream().skip(4995).forEach(System.out::println);
}
}优势:
注意事项:
通过采用 Java Stream API 的 map、flatMap 和 collect 等操作,我们可以有效地重构批量数据处理代码,从而避免共享可变性。这种方法不仅提升了代码的线程安全性、可读性和可维护性,还使其更符合现代函数式编程的理念。在处理大量数据或构建高并发系统时,优先考虑这种不可变的数据处理模式将带来显著的优势。
以上就是Java Stream API:重构批量数据处理以避免共享可变性的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号