
针对activemq artemis中,使用选择器浏览消息成功但消费者无法接收消息的偶发性问题,本文深入分析了其常见原因。通过对比jms客户端库(核心jms与openwire),揭示了该问题可能源于特定客户端与旧版broker之间的兼容性缺陷(如artemis-3916)。教程提供了详细的示例代码,并建议通过切换至核心jms客户端或升级broker版本来有效解决此问题,确保消息可靠处理。
在使用 ActiveMQ Artemis 进行消息队列开发时,开发者可能会遇到一个令人困惑的问题:通过消息选择器(Selector)可以成功浏览(Browse)到指定的消息,但尝试使用 MessageConsumer 接收(Receive)同一条消息时却失败,表现为 receive() 方法返回 null 或抛出异常。本文将深入探讨这一现象的潜在原因,并提供切实可行的解决方案。
在 ActiveMQ Artemis 2.18.0 版本中,结合 artemis-jms-client-all:2.18.0 客户端库,部分用户反馈在约万分之三的概率下,即使通过 QueueBrowser 和 JMSMessageID 选择器能够准确定位到消息,但随后的 MessageConsumer 却无法接收到该消息。以下是复现此问题的典型代码片段:
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import javax.jms.*;
import java.util.Enumeration;
public class MessageReceiveFailureReproducer {
private static final String BROKER_URL = "tcp://localhost:61616";
private static final String QUEUE_NAME = "hospital";
public static void main(String[] args) {
// 假设 'id' 是通过某种方式获取到的消息ID
String messageIdToFind = "some-message-id-example"; // 替换为实际的消息ID
String selector = "JMSMessageID='" + messageIdToFind + "'";
Connection connection = null;
Session session = null;
try {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
connection = connectionFactory.createConnection();
session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue deadQueue = session.createQueue(QUEUE_NAME);
connection.start();
// 1. 使用 QueueBrowser 浏览消息
QueueBrowser browser = session.createBrowser(deadQueue, selector);
Enumeration<?> enumeration = browser.getEnumeration();
int foundedElements = 0;
while (enumeration.hasMoreElements()) {
Message message = (Message) enumeration.nextElement();
System.out.println("Browser found message with ID: " + message.getJMSMessageID());
foundedElements++;
}
browser.close();
if (foundedElements != 1) {
throw new IllegalStateException("Expected 1 message with selector, but found " + foundedElements);
}
// 2. 使用 MessageConsumer 尝试接收消息
MessageConsumer messageConsumer = session.createConsumer(deadQueue, selector);
Message receivedMessage = messageConsumer.receive(1000); // 设置超时1秒
if (receivedMessage == null) {
throw new IllegalStateException("MessageConsumer failed to receive message with ID: " + messageIdToFind);
} else {
System.out.println("Consumer successfully received message with ID: " + receivedMessage.getJMSMessageID());
}
messageConsumer.close();
session.commit(); // 提交事务
System.out.println("Transaction committed successfully.");
} catch (JMSException | RuntimeException e) {
System.err.println("An error occurred: " + e.getMessage());
try {
if (session != null) {
session.rollback(); // 回滚事务
System.out.println("Transaction rolled back.");
}
} catch (JMSException e1) {
System.err.println("Error during rollback: " + e1.getMessage());
}
throw new RuntimeException("Application failed", e);
} finally {
if (session != null) {
try {
session.close();
} catch (JMSException e) {
System.err.println("Error closing session: " + e.getMessage());
}
}
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
System.err.println("Error closing connection: " + e.getMessage());
}
}
}
}
}上述代码旨在通过 JMSMessageID 选择器先确认消息存在,然后尝试接收。然而,在某些情况下,尽管 foundedElements 为 1,receivedMessage 却可能为 null,导致程序抛出 IllegalStateException。
经过深入测试和社区反馈,发现此问题通常与所使用的 JMS 客户端库类型及其与 ActiveMQ Artemis Broker 版本的兼容性有关。
当使用 OpenWire JMS 客户端库(例如,通过 artemis-jms-client-all 间接引入或显式依赖 activemq-client 等)连接 ActiveMQ Artemis Broker 2.18.0 版本时,很可能会触发 Apache Artemis 项目中的一个已知缺陷:ARTEMIS-3916。这个缺陷描述了在特定条件下,OpenWire 客户端在使用选择器时,MessageConsumer 可能无法正确匹配或接收到消息。
相比之下,如果使用 ActiveMQ Artemis 核心 JMS 客户端库(通常是 artemis-jms-client 或 artemis-core-client),则此类问题通常不会发生。这表明问题并非出在 Broker 本身的消息存储或选择器逻辑上,而是客户端与 Broker 之间通信协议或特定客户端实现的兼容性问题。
针对此问题,主要有两种推荐的解决方案:
这是最直接且推荐的解决方案。确保您的项目依赖使用的是 ActiveMQ Artemis 官方的核心 JMS 客户端库,而非 OpenWire 兼容客户端。
示例代码(使用核心 JMS 客户端):
以下代码演示了如何使用核心 JMS 客户端发送和接收消息,并验证其选择器功能。
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import javax.jms.*;
import java.util.Enumeration;
import java.util.UUID;
public class CoreJMSClientExample {
private static final String BROKER_URL = "tcp://localhost:61616";
private static final String QUEUE_NAME = "hospital";
private static final String TEST_MESSAGE_CONTENT = "This is a test message for Artemis.";
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
try (Connection connection = connectionFactory.createConnection()) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue deadQueue = session.createQueue(QUEUE_NAME);
connection.start();
// 1. 发送一条带特定内容的测试消息
MessageProducer producer = session.createProducer(deadQueue);
TextMessage sentMessage = session.createTextMessage(TEST_MESSAGE_CONTENT);
producer.send(sentMessage);
session.commit(); // 提交发送事务
String sentMessageId = sentMessage.getJMSMessageID();
String selector = "JMSMessageID='" + sentMessageId + "'";
System.out.println("Sent message with ID: " + sentMessageId);
System.out.println("Using selector: " + selector);
// 2. 使用 QueueBrowser 浏览消息
QueueBrowser browser = session.createBrowser(deadQueue, selector);
Enumeration<?> enumeration = browser.getEnumeration();
int foundedElements = 0;
while (enumeration.hasMoreElements()) {
Message msg = (Message) enumeration.nextElement();
System.out.println("Browser found message with ID: " + msg.getJMSMessageID());
foundedElements++;
}
browser.close();
if (foundedElements != 1) {
throw new IllegalStateException("Expected 1 message with selector, but browser found " + foundedElements);
}
// 3. 使用 MessageConsumer 接收消息
MessageConsumer consumer = session.createConsumer(deadQueue, selector);
Message receivedMessage = consumer.receive(5000); // 5秒超时
if (receivedMessage == null) {
throw new IllegalStateException("MessageConsumer failed to receive message with ID: " + sentMessageId);
} else if (!(receivedMessage instanceof TextMessage) || !((TextMessage) receivedMessage).getText().equals(TEST_MESSAGE_CONTENT)) {
throw new IllegalStateException("Received message content does not match or is not a TextMessage.");
}
System.out.println("Consumer successfully received message with ID: " + receivedMessage.getJMSMessageID() +
" Content: " + ((TextMessage) receivedMessage).getText());
consumer.close();
session.commit(); // 提交接收事务
System.out.println("Transaction committed successfully after receiving.");
} catch (JMSException e) {
System.err.println("JMS Exception occurred: " + e.getMessage());
throw new RuntimeException("JMS operation failed", e);
} catch (Exception e) {
System.err.println("An unexpected error occurred: " + e.getMessage());
throw new RuntimeException("Application failed", e);
}
}
}依赖配置(Maven): 确保您的 pom.xml 中包含 ActiveMQ Artemis 核心 JMS 客户端依赖:
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-jms-client</artifactId>
<version>2.18.0</version> <!-- 与您的Broker版本匹配或更高 -->
</dependency>如果您之前使用了 artemis-jms-client-all 或 activemq-client(OpenWire客户端),请移除它们,并仅保留 artemis-jms-client。
如果无法更换客户端库(例如,由于历史遗留系统或第三方集成),那么升级 ActiveMQ Artemis Broker 是另一个有效的解决方案。ARTEMIS-3916 问题已在 ActiveMQ Artemis 2.25.0 及更高版本中得到修复。
推荐升级路径:
升级 Broker 版本通常需要进行充分的测试,以确保与现有应用程序的兼容性。
ActiveMQ Artemis 中使用选择器浏览成功但消费者接收失败的问题,通常是由于旧版 Broker 与 OpenWire JMS 客户端之间的兼容性缺陷(ARTEMIS-3916)所致。通过切换到 ActiveMQ Artemis 核心 JMS 客户端或升级 Broker 版本到 2.25.0 或更高,可以有效解决此问题,确保消息队列的稳定可靠运行。在生产环境中,始终建议使用最新稳定版本的软件,并进行充分的兼容性测试。
以上就是ActiveMQ Artemis:选择器浏览成功但消费者接收失败的解决方案的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号