要使用java构建websocket服务端实现实时推送,可通过spring boot快速搭建,其核心步骤包括添加依赖、配置websocket和实现处理器。1. 在pom.xml中添加spring-boot-starter-websocket依赖;2. 创建配置类websocketconfig并启用websocket支持,注册处理器并设置允许的来源;3. 实现mytextwebsockethandler处理器,继承textwebsockethandler并重写连接建立、消息处理、连接关闭等方法,使用copyonwritearrayset管理会话实现广播,或使用concurrenthashmap实现定向推送;4. websocket相较于http轮询和长轮询具有全双工通信、低开销、高实时性等优势;5. 高并发场景下需解决连接管理、i/o瓶颈、跨实例会话管理、认证授权等问题,可采用异步处理、消息队列、redis共享会话、jwt令牌等方案。

Java构建WebSocket服务端来实现实时推送,说白了,就是建立一个持久化的双向通信通道,让服务器能够主动、即时地把数据推送到客户端,而不是客户端反复去问“有新消息吗?”。这对于聊天应用、股票行情、实时通知或者任何需要即时数据更新的场景来说,几乎是唯一的选择。它避免了传统HTTP轮询的低效和长轮询的复杂性,提供了一种更优雅、更高效的解决方案。

在Java生态中,搭建WebSocket服务端,Spring Boot无疑是当下最便捷、功能最完善的选择之一。我们通常会利用spring-boot-starter-websocket这个模块来快速构建。
首先,在你的pom.xml中加入依赖:
立即学习“Java免费学习笔记(深入)”;

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>接着,你需要配置WebSocket。创建一个配置类,启用WebSocket支持:
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(new MyTextWebSocketHandler(), "/ws")
.setAllowedOrigins("*"); // 允许所有来源,生产环境应限制特定域名
}
}然后,就是实现你的WebSocket处理器。你可以继承TextWebSocketHandler来处理文本消息,或者BinaryWebSocketHandler来处理二进制消息。这里以文本消息为例:

import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;
@Component
public class MyTextWebSocketHandler extends TextWebSocketHandler {
// 用来存放所有在线的WebSocketSession
private static final CopyOnWriteArraySet<WebSocketSession> sessions = new CopyOnWriteArraySet<>();
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
sessions.add(session);
System.out.println("新连接建立: " + session.getId() + ", 当前在线人数: " + sessions.size());
session.sendMessage(new TextMessage("欢迎连接到WebSocket服务!"));
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
System.out.println("收到来自 " + session.getId() + " 的消息: " + message.getPayload());
// 可以将消息广播给所有客户端
broadcastMessage(message.getPayload());
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
sessions.remove(session);
System.out.println("连接关闭: " + session.getId() + ", 状态: " + status.getCode() + ", 原因: " + status.getReason() + ", 当前在线人数: " + sessions.size());
}
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
System.err.println("传输错误: " + session.getId() + ", 错误信息: " + exception.getMessage());
sessions.remove(session);
}
// 广播消息给所有在线客户端
public static void broadcastMessage(String message) {
TextMessage textMessage = new TextMessage(message);
for (WebSocketSession session : sessions) {
try {
if (session.isOpen()) {
session.sendMessage(textMessage);
}
} catch (IOException e) {
System.err.println("发送消息到 " + session.getId() + " 失败: " + e.getMessage());
// 考虑移除发送失败的session,或者在afterConnectionClosed中统一处理
}
}
}
}现在,你的Spring Boot应用启动后,客户端就可以通过ws://localhost:8080/ws(如果你的端口是8080)连接到WebSocket服务了。客户端可以使用JavaScript的WebSocket API来连接和发送接收消息。
谈到实时推送,很多人会先想到HTTP轮询或长轮询,但WebSocket出来后,它们就显得有点“老旧”了。WebSocket的优势在于它的全双工通信能力和更低的协议开销。
传统HTTP轮询,说白了就是客户端每隔一段时间(比如1秒)就去问服务器“有新数据吗?”。这种方式简单粗暴,但效率极低,服务器大部分时间都在处理“没有新数据”的请求,浪费带宽和服务器资源。而且,消息的实时性取决于你设置的轮询间隔,间隔短了,资源消耗大;间隔长了,消息就不够“实时”。
长轮询稍微好一点,客户端发起请求后,服务器会“hold住”这个请求,直到有新数据或者超时才响应。客户端收到响应后会立即发起新的请求。这减少了空轮询的次数,但本质上还是基于HTTP请求-响应模型,每次通信都需要完整的HTTP头,开销不小。而且,服务器端维护这些“挂起”的请求也需要一定的资源。
WebSocket则完全不同。它在HTTP握手之后,会升级为一个持久化的TCP连接。一旦连接建立,客户端和服务器可以随时互相发送数据,是真正的双向通信。这意味着服务器可以主动“推送”消息给客户端,无需客户端发起请求。这种模式带来的好处显而易见的:
当然,WebSocket也并非万能药,它更适合需要高实时性、高频率数据交互的场景。对于那些数据更新不频繁,或者只需要单向获取信息的场景,传统的HTTP请求可能依然是更简单的选择。
在构建实时推送系统时,仅仅能够接收消息是不够的,核心在于如何将消息准确地送达给一个或多个客户端。这通常涉及到两种模式:广播(给所有连接的客户端)和定向推送(给特定的客户端)。
实现这两种推送模式,关键在于如何管理和访问这些活跃的WebSocketSession对象。在上面给出的示例代码中,我们用了一个CopyOnWriteArraySet<WebSocketSession>来存储所有在线的session。
1. 消息广播:
广播是最直接的方式。当有需要通知所有客户端的事件发生时(比如系统公告、全局更新),你只需要遍历所有当前活跃的WebSocketSession,然后逐一发送消息。
在MyTextWebSocketHandler中,broadcastMessage方法就是实现了这一点:
public static void broadcastMessage(String message) {
TextMessage textMessage = new TextMessage(message);
for (WebSocketSession session : sessions) { // 遍历所有session
try {
if (session.isOpen()) { // 确保session仍然是打开状态
session.sendMessage(textMessage);
}
} catch (IOException e) {
// 发送失败通常意味着客户端已断开,但连接状态更新滞后,
// 可以在这里记录日志,或者更健壮地处理(比如尝试移除这个session)
System.err.println("发送消息到 " + session.getId() + " 失败: " + e.getMessage());
}
}
}这里需要注意并发问题。CopyOnWriteArraySet在读多写少的场景下表现良好,因为它在写入时会复制底层数组,保证了迭代时的线程安全。但如果连接频繁建立和断开,或者在线人数巨大,它的性能可能会受到影响。
2. 定向推送:
定向推送就复杂一些,因为它需要识别“特定客户端”。这通常意味着你需要维护一个“用户ID”到“WebSocketSession”的映射关系。
例如,当用户登录成功并通过WebSocket连接时,你可以将用户的ID与对应的WebSocketSession关联起来:
import java.util.concurrent.ConcurrentHashMap;
import java.util.Map;
// ... 其他import
@Component
public class MyTextWebSocketHandler extends TextWebSocketHandler {
// 存储所有在线session,key可以是用户ID,value是WebSocketSession
private static final Map<String, WebSocketSession> userSessions = new ConcurrentHashMap<>();
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
// 假设你可以从session中获取用户ID,例如通过认证信息或者URL参数
String userId = extractUserIdFromSession(session); // 这是一个你需要实现的方法
if (userId != null) {
userSessions.put(userId, session);
System.out.println("用户 " + userId + " 连接建立: " + session.getId());
}
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
// 在连接关闭时移除对应的session
userSessions.entrySet().removeIf(entry -> entry.getValue().equals(session));
System.out.println("连接关闭: " + session.getId() + ", 当前在线用户数: " + userSessions.size());
}
// 定向发送消息给某个用户
public static void sendMessageToUser(String userId, String message) {
WebSocketSession session = userSessions.get(userId);
if (session != null && session.isOpen()) {
try {
session.sendMessage(new TextMessage(message));
System.out.println("向用户 " + userId + " 发送消息: " + message);
} catch (IOException e) {
System.err.println("发送消息到用户 " + userId + " 失败: " + e.getMessage());
}
} else {
System.out.println("用户 " + userId + " 不在线或会话已关闭。");
}
}
// 示例:如何从session中获取用户ID(实际情况可能通过认证令牌等)
private String extractUserIdFromSession(WebSocketSession session) {
// 实际应用中,这部分会涉及安全认证,例如从Spring Security的Principal中获取
// 或者从WebSocket握手时的HTTP头、URL参数中解析
// 简单示例:假设客户端连接时URL是 /ws?userId=xxx
String query = session.getUri() != null ? session.getUri().getQuery() : null;
if (query != null && query.contains("userId=")) {
return query.split("userId=")[1].split("&")[0];
}
return null;
}
}在实际应用中,extractUserIdFromSession这部分会非常关键,它通常涉及到与你的用户认证系统集成。你可能需要从WebSocket握手请求的HTTP头中获取JWT令牌,然后解析出用户ID。
定向推送的挑战在于,如果一个用户在不同设备或浏览器上建立了多个WebSocket连接,你可能需要一个Map<String, Set<WebSocketSession>>来存储一个用户ID对应的所有session,以便将消息推送到该用户的所有在线客户端。
当你的WebSocket服务需要支撑成千上万甚至百万级的并发连接时,一些潜在的挑战就会浮出水面,这需要我们在系统设计时就有所考量。
1. 连接管理与内存消耗:
每个WebSocket连接都会占用服务器一定的内存资源,包括会话对象、缓冲区等。在高并发下,这些累积的内存开销可能非常巨大。
ConcurrentHashMap,而不是同步锁或CopyOnWriteArraySet(在高写入场景下效率不高)。2. 消息处理与I/O瓶颈:
大量的消息发送和接收会产生大量的I/O操作。如果处理不当,可能导致线程阻塞,影响整体吞吐量。
3. 跨服务器实例的会话管理(集群部署):
当你的WebSocket服务需要进行水平扩展,部署多个实例时,如何确保消息能够推送到正确的客户端,无论它连接到哪个服务器实例,是一个核心问题。
4. 认证与授权:
在高并发下,每次连接的认证和后续消息的授权都需要高效执行。
WebSocketSession。后续消息处理时,直接使用会话中已认证的身份,避免重复认证。总的来说,构建高并发的Java WebSocket服务,不仅仅是写几行代码那么简单,它更像是一个系统工程,需要你对网络通信、并发编程、分布式系统有深入的理解,并结合实际业务场景进行权衡和设计。
以上就是如何使用Java搭建WebSocket服务端 Java构建实时推送系统的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号