
java并发消息发送系统中的会话管理与`wait`/`notify`机制深度解析。本文将探讨如何利用java的`wait`/`notify`机制在多线程环境中实现短信批量发送与会话重连。我们将分析常见的同步问题,特别是因不当的`isempty()`检查和共享资源访问导致的`arrayindexoutofboundsexception`,并提供正确同步共享资源和管理线程状态的策略,以构建健壮的并发操作。
在企业级应用中,批量发送短信(或其他消息)是一个常见需求。为了提高吞吐量,通常会采用多线程并发发送的策略。然而,消息发送依赖于与外部服务器(如SMSC)建立的会话(SMPPSession)。这种会话可能因网络波动、服务器重启等原因中断,此时需要一个机制来重新建立会话,并在会话重连期间暂停所有发送操作,待会话恢复后再继续。
本教程将深入探讨如何使用Java的Object.wait()和Object.notifyAll()机制来协调多个消息发送线程和一个会话管理线程,以实现上述功能。我们将分析在并发场景下可能遇到的同步问题,并提供一套健壮的解决方案。
wait()和notify()(或notifyAll())是Java中用于线程间协作的基础机制,它们允许线程在特定条件下暂停执行并等待,直到另一个线程通知它条件满足。
关键点:
立即学习“Java免费学习笔记(深入)”;
原始代码尝试使用Client.messages列表作为监视器对象来协调线程。然而,其中存在几个关键的同步问题,导致了ArrayIndexOutOfBoundsException和线程不同步。
在Sender和SessionProducer线程的run()方法中,外部的while (!Client.messages.isEmpty())循环条件是在synchronized (Client.messages)块外部检查的。
// Sender线程示例
while (!Client.messages.isEmpty()){ // 问题:在同步块外检查
synchronized (Client.messages){
// ...
}
}问题分析: 假设Client.messages中只剩一条消息。多个Sender线程可能同时执行到while (!Client.messages.isEmpty()),它们都发现列表不为空,然后都尝试进入synchronized (Client.messages)块。当第一个线程进入同步块并成功移除消息后,列表变为空。此时,后续进入同步块的线程在执行Client.messages.remove(0)时,就会因为列表已空而抛出ArrayIndexOutOfBoundsException。
即使isEmpty()检查放在同步块内,remove(0)操作也需要谨慎。如果多个线程同时尝试移除,且消息数量不足,仍然可能导致问题。在CopyOnWriteArrayList中,remove(0)本身是线程安全的,但它并不能阻止在列表为空时尝试移除。
Sender线程在成功发送消息后调用了Client.messages.notifyAll()。然而,此时SessionProducer线程可能正在等待会话断开,或者其他Sender线程可能正在等待消息或会话。这种通知通常是不必要的,并且可能导致不必要的线程唤醒,甚至掩盖真正的等待条件。
原始代码中的wait()没有放在while循环中检查条件。
// 原始Sender线程的等待逻辑
} else {
try {
Client.messages.wait(); // 问题:没有在while循环中检查条件
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}这可能导致线程在被唤醒后,其等待的条件(例如smppSession.isBind()为true或Client.messages不为空)实际上并未满足,从而导致逻辑错误或再次进入等待状态。
为了解决上述问题,我们需要对代码进行重构,遵循以下核心原则:
SMPPSession作为共享资源,其bind状态是所有线程关注的焦点。我们可以将它作为监视器对象。
public class SMPPSession {
private boolean bind = false; // 初始状态为未绑定
private static final Random idGenerator = new Random();
public synchronized int sendMessage(String msg) { // 保持sendMessage同步
try {
Thread.sleep(100L); // 模拟发送延迟
System.out.println("Sending message: " + msg);
return Math.abs(idGenerator.nextInt());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.err.println("Message sending interrupted: " + e.getMessage());
}
return -1;
}
public synchronized void reBind() { // reBind方法也同步
try {
System.out.println("Rebinding...");
Thread.sleep(2000L); // 模拟重连延迟
this.bind = true;
System.out.println("Session established!");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.err.println("Rebinding interrupted: " + e.getMessage());
}
}
public synchronized boolean isBind() { // isBind方法也同步
return this.bind;
}
public synchronized void setBind(boolean bind) { // 允许外部设置绑定状态
this.bind = bind;
}
}Sender线程需要等待两个条件:SMPPSession已绑定,且消息队列不为空。
import java.util.concurrent.CopyOnWriteArrayList;
public class Sender extends Thread {
private SMPPSession smppSession;
private CopyOnWriteArrayList<String> messages; // 引用共享消息列表
private volatile boolean running = true; // 控制线程生命周期
public Sender(String name, SMPPSession smppSession, CopyOnWriteArrayList<String> messages) {
this.setName(name);
this.smppSession = smppSession;
this.messages = messages;
}
public void terminate() {
this.running = false;
// 确保线程不会无限等待,如果正在wait(),需要被中断或notify
synchronized (smppSession) {
smppSession.notifyAll();
}
}
@Override
public void run() {
while (running) {
synchronized (smppSession) { // 使用smppSession作为监视器
// 等待条件:会话未绑定 或 消息队列为空
while (!smppSession.isBind() || messages.isEmpty()) {
// 如果消息已全部发送且会话已绑定,则此Sender可以退出
if (messages.isEmpty() && smppSession.isBind()) {
System.out.println(getName() + ":所有消息已发送完毕,线程退出。");
running = false; // 标记为停止
smppSession.notifyAll(); // 通知其他可能等待的线程
break; // 跳出内部while循环
}
try {
System.out.println(getName() + ":等待中... 会话绑定状态: " + smppSession.isBind() + ", 消息队列是否为空: " + messages.isEmpty());
smppSession.wait(); // 等待在smppSession对象上
} catch (InterruptedException e) {
System.out.println(getName() + ":被中断,线程退出。");
Thread.currentThread().interrupt();
running = false; // 标记为停止
break; // 跳出内部while循环
}
}
if (!running) { // 如果在等待过程中被标记为停止,则退出外部while循环
break;
}
// 条件满足:smppSession已绑定且messages不为空
final String msg = messages.remove(0); // 安全移除消息
final int msgId = smppSession.sendMessage(msg);
System.out.println(Thread.currentThread().getName() + " 发送消息并收到ID: " + msgId + "。剩余消息数:" + messages.size());
// 发送消息后,如果消息队列变空,可能需要通知其他Sender线程退出
// 或者如果Producer在等待消息队列状态,则需要通知
// 这里暂时不需要notifyAll,因为发送消息通常不改变Producer的等待条件
// 但如果messages.isEmpty()是Producer的等待条件之一,则需要
}
// 考虑在发送消息后短暂休眠,避免发送过快
try {
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
running = false;
}
}
}
}SessionProducer线程负责在会话未绑定时进行重连,并在重连成功后通知所有Sender线程。
import java.util.concurrent.CopyOnWriteArrayList;
public class SessionProducer extends Thread {
private SMPPSession smppSession;
private CopyOnWriteArrayList<String> messages; // 引用共享消息列表,用于判断是否还有消息需要发送
private volatile boolean running = true;
public SessionProducer(String name, SMPPSession smppSession, CopyOnWriteArrayList<String> messages) {
this.setName(name);
this.smppSession = smppSession;
this.messages = messages;
}
public void terminate() {
this.running = false;
synchronized (smppSession) {
smppSession.notifyAll();
}
}
@Override
public void run() {
while (running) {
synchronized (smppSession) { // 使用smppSession作为监视器
// 如果会话已绑定,且所有消息都已发送完毕,则Producer可以退出
if (smppSession.isBind() && messages.isEmpty()) {
System.out.println(getName() + ":所有消息已发送完毕,会话已绑定,线程退出。");
running = false;
smppSession.notifyAll(); // 通知所有线程可以退出
break;
}
// 等待条件:会话已绑定 或 消息队列为空(如果 Producer 也需要关注消息队列状态)
// 这里主要关注会话绑定状态
while (smppSession.isBind() && !messages.isEmpty()) { // 如果会话已绑定且还有消息要发,Producer等待
try {
System.out.println(getName() + ":等待中... 会话已绑定,等待会话断开或所有消息发送完毕。");
smppSession.wait(); // 等待在smppSession对象上
} catch (InterruptedException e) {
System.out.println(getName() + ":被中断,线程退出。");
Thread.currentThread().interrupt();
running = false;
break;
}
}
if (!running) {
break;
}
// 此时,会话可能未绑定,或者消息队列为空(如果上面条件包含)
if (!smppSession.isBind()) { // 如果会话未绑定,则进行重连
smppSession.reBind();
System.out.println(Thread.currentThread().getName()以上就是Java并发消息发送系统中的会话管理与wait/notify机制深度解析的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号