
在多线程编程中,一个常见的场景是多个线程需要从一个共享的任务池中获取并执行任务。例如,一个线程池有固定数量的线程,而任务列表中的任务数量可能远超线程数。理想情况下,当一个线程完成当前任务后,应立即从列表中获取下一个未被占用的任务并继续执行,以最大化资源利用率。手动管理这种任务分配和线程同步会非常复杂且容易出错,例如,一个任务在被线程A获取后,如何确保线程B不会重复获取,以及如何高效地通知空闲线程有新任务可取。
Java的java.util.concurrent包提供了一套强大的并发工具,其中ExecutorService是管理线程池和任务调度的首选。它抽象了线程的创建、管理和任务的提交过程,极大地简化了并发编程。
ExecutorService接口提供了提交任务(Runnable或Callable)的方法,并能自动将这些任务分配给线程池中的可用线程。当一个线程完成其当前任务后,ExecutorService会自动从其内部的任务队列中取出下一个待执行的任务分配给该线程。这正是我们所需的高效任务分发机制。
假设我们有一个字符串列表,每个字符串代表一个需要执行的任务。我们可以将每个字符串包装成一个Runnable任务,然后提交给ExecutorService。
立即学习“Java免费学习笔记(深入)”;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class TaskProcessor {
public static void main(String[] args) {
// 定义任务列表
List<String> tasks = Arrays.asList(
"firstTask", "secondTask", "thirdTask", "fourthTask", "fifthTask",
"sixthTask", "seventhTask", "eighthTask", "ninthTask", "tenthTask"
);
// 创建一个固定大小的线程池,例如3个线程
ExecutorService executor = Executors.newFixedThreadPool(3);
System.out.println("--- 开始提交任务 ---");
// 遍历任务列表,将每个任务提交给ExecutorService
for (String taskName : tasks) {
// 将每个字符串任务封装为一个Runnable
Runnable worker = new Runnable() {
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + " 正在处理任务: " + taskName);
// 模拟任务执行时间
Thread.sleep((long) (Math.random() * 1000));
System.out.println(Thread.currentThread().getName() + " 完成任务: " + taskName);
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // 重新设置中断状态
System.err.println(Thread.currentThread().getName() + " 任务 " + taskName + " 被中断。");
}
}
};
executor.submit(worker); // 提交任务
}
System.out.println("--- 所有任务已提交,等待执行完成 ---");
// 关闭ExecutorService,等待所有已提交任务完成
executor.shutdown(); // 不再接受新任务
try {
// 等待所有任务在指定时间内完成
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
System.err.println("部分任务未在规定时间内完成,强制关闭。");
executor.shutdownNow(); // 强制关闭所有正在执行的任务
}
} catch (InterruptedException e) {
System.err.println("等待任务完成时被中断。");
executor.shutdownNow();
}
System.out.println("--- 所有任务执行完毕 ---");
}
}在上述示例中:
ExecutorService内部通常包含一个BlockingQueue来存储待执行的任务。当调用submit()方法时,任务被放入这个队列。线程池中的工作线程会不断地从这个队列中take()(阻塞地获取)任务。一旦获取到任务,线程就执行它。当任务完成时,线程会再次尝试从队列中获取下一个任务。这种机制天然地实现了任务的公平分配和线程的高效利用。
虽然ExecutorService是处理此类问题的最佳实践,但在某些特定场景下,或者为了更深入理解其底层机制,我们也可以直接使用BlockingQueue来实现类似的任务分发逻辑。BlockingQueue是java.util.concurrent包中的一个接口,它支持在检索元素时阻塞,或者在队列满时阻塞添加元素。
BlockingQueue是线程安全的,常用于实现生产者-消费者模式。生产者线程将任务放入队列,消费者线程从队列中取出任务。
以下是一个使用BlockingQueue实现任务分发的基本框架。这比ExecutorService更底层,需要手动管理线程的启动和关闭。
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
public class ManualTaskProcessor {
private static final int NUM_THREADS = 3;
private static final BlockingQueue<String> taskQueue = new LinkedBlockingQueue<>();
private static final AtomicInteger tasksCompleted = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
List<String> tasks = Arrays.asList(
"firstTask", "secondTask", "thirdTask", "fourthTask", "fifthTask",
"sixthTask", "seventhTask", "eighthTask", "ninthTask", "tenthTask"
);
// 生产者:将所有任务放入队列
for (String taskName : tasks) {
taskQueue.put(taskName); // put() 方法会阻塞直到有空间可用
}
// 消费者:创建并启动工作线程
Thread[] workerThreads = new Thread[NUM_THREADS];
for (int i = 0; i < NUM_THREADS; i++) {
workerThreads[i] = new Thread(() -> {
try {
while (true) {
// take() 方法会阻塞直到队列中有元素可用
String task = taskQueue.take();
if ("POISON_PILL".equals(task)) { // 毒丸机制,用于优雅关闭线程
taskQueue.put(task); // 将毒丸放回队列,让其他线程也能收到
break;
}
System.out.println(Thread.currentThread().getName() + " 正在处理任务: " + task);
Thread.sleep((long) (Math.random() * 1000)); // 模拟任务执行
System.out.println(Thread.currentThread().getName() + " 完成任务: " + task);
tasksCompleted.incrementAndGet();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.err.println(Thread.currentThread().getName() + " 被中断。");
}
}, "Worker-" + (i + 1));
workerThreads[i].start();
}
// 等待所有任务完成
while (tasksCompleted.get() < tasks.size()) {
Thread.sleep(100); // 短暂等待
}
// 发送“毒丸”信号,通知所有工作线程退出
for (int i = 0; i < NUM_THREADS; i++) {
taskQueue.put("POISON_PILL");
}
// 等待所有工作线程结束
for (Thread thread : workerThreads) {
thread.join();
}
System.out.println("--- 所有任务执行完毕 ---");
}
}这个例子展示了BlockingQueue如何作为任务队列工作。生产者(主线程)将任务放入队列,消费者(工作线程)从队列中取出任务。为了实现优雅关闭,我们使用了“毒丸”机制,即在所有实际任务提交完毕后,向队列中放入特殊标记(如“POISON_PILL”),工作线程遇到此标记时退出循环。
当面临多个线程需要从一个共享任务列表中动态获取并执行任务的场景时,ExecutorService是Java中最推荐和最强大的解决方案。它通过其内置的线程池管理和任务调度机制,极大地简化了并发编程,避免了手动实现复杂同步逻辑的陷阱。虽然BlockingQueue提供了实现类似机制的底层能力,但在大多数情况下,直接使用ExecutorService能够提供更高的效率、更好的可维护性和更少的错误。正确地理解和使用ExecutorService是编写高效、健壮并发应用程序的关键。
以上就是Java多线程任务调度:共享任务列表的高效处理策略的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号