
本文旨在解决在使用 Java 客户端从 Google Pub/Sub 拉取消息时遇到的高延迟问题。通过分析同步拉取模式的局限性,并提供异步流式拉取的替代方案,帮助开发者优化消息拉取效率,降低延迟,从而更有效地处理大量消息。
在使用 Google Pub/Sub 时,开发者可能会遇到从主题中拉取消息时延迟较高的问题,尤其是在处理大量消息时。本文将探讨如何优化 Java 客户端的配置,以减少消息拉取延迟,提高吞吐量。
通常情况下,开发者会采用同步拉取模式,设置 maxMessages 参数来控制每次请求拉取的消息数量。然而,即使设置了较大的 maxMessages 值,实际拉取到的消息数量可能远小于预期。这主要是因为 Pub/Sub 服务在吞吐量和延迟之间进行权衡,倾向于快速返回部分消息,而不是等待收集到最大数量的消息。
问题示例代码:
立即学习“Java免费学习笔记(深入)”;
public List<ReceivedMessage> getMessagesFromSubscription(String projectId, String subscriptionId, int numOfMessages,
CredentialsProvider credentialsProvider) {
List<ReceivedMessage> receivedMessages = new ArrayList<>();
try {
SubscriberStubSettings subscriberStubSettings = getSubscriberStubSettings(credentialsProvider);
try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {
String subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId);
PullRequest pullRequest = PullRequest.newBuilder()
.setMaxMessages(100)
.setSubscription(subscriptionName)
.build();
PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);
List<String> ackIds = new ArrayList<>();
for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
ackIds.add(message.getAckId());
ModifyAckDeadlineRequest modifyAckDeadlineRequest = ModifyAckDeadlineRequest.newBuilder()
.setSubscription(subscriptionName)
.addAckIds(message.getAckId())
.setAckDeadlineSeconds(30)
.build();
subscriber.modifyAckDeadlineCallable().call(modifyAckDeadlineRequest);
}
if (ackIds.isEmpty()) {
// my logic
} else {
AcknowledgeRequest acknowledgeRequest = AcknowledgeRequest.newBuilder()
.setSubscription(subscriptionName)
.addAllAckIds(ackIds)
.build();
subscriber.acknowledgeCallable().call(acknowledgeRequest);
receivedMessages = new ArrayList<>(pullResponse.getReceivedMessagesList());
}
}
LOG.info("getMessagesFromSubscription: Received {} Messages for Project Id: {} and" +
" Subscription Id: {}.", receivedMessages.size(), projectId, subscriptionId);
} catch (Exception e) {
LOG.error("getMessagesFromSubscription: Error while pulling message from Pub/Sub " +
"from Project ID: {} and Subscription ID: {}", projectId, subscriptionId, e);
}
return receivedMessages;
}
private SubscriberStubSettings getSubscriberStubSettings(CredentialsProvider credentialsProvider) throws IOException {
SubscriberStubSettings.Builder subscriberStubSettingsBuilder = SubscriberStubSettings
.newBuilder()
.setTransportChannelProvider(SubscriberStubSettings
.defaultGrpcTransportProviderBuilder()
.setMaxInboundMessageSize(20 << 20)
.build());
if (credentialsProvider != null) {
subscriberStubSettingsBuilder = subscriberStubSettingsBuilder.setCredentialsProvider(credentialsProvider);
}
return subscriberStubSettingsBuilder.build();
}上述代码展示了同步拉取消息的典型实现。虽然设置了 maxMessages 为 100,但实际每次拉取的消息数量可能远小于这个值,导致整体延迟增加。
为了最大限度地提高吞吐量并降低延迟,建议使用异步流式拉取模式。异步拉取允许同时发出多个拉取请求,从而使 Pub/Sub 服务能够更好地利用资源,并根据订阅者的处理能力调整消息发送速率。
异步流式拉取的优势:
实现异步流式拉取的步骤:
示例代码 (简要说明,完整代码请参考官方文档):
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.SubscriptionName;
// ...
String projectId = "your-project-id";
String subscriptionId = "your-subscription-id";
SubscriptionName subscriptionName = SubscriptionName.of(projectId, subscriptionId);
MessageReceiver receiver = (PubsubMessage message, AckReplyConsumer consumer) -> {
System.out.println("Received message: " + message.getData().toStringUtf8());
consumer.ack();
};
Subscriber subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
try {
subscriber.startAsync().awaitRunning();
System.out.println("Listening for messages on " + subscriptionName.toString());
// Keep the main thread alive to allow message processing.
Thread.sleep(Long.MAX_VALUE);
} catch (Exception e) {
// Handle error
subscriber.stopAsync();
}注意事项:
通过采用异步流式拉取模式,可以显著降低 Google Pub/Sub 消息拉取的延迟,提高吞吐量,从而更有效地处理大量消息。在实际应用中,需要根据具体的业务场景和性能需求,选择合适的拉取模式和配置参数,以达到最佳的性能表现。同时,监控和调优是持续的过程,需要不断地根据实际情况进行调整。
以上就是优化 Google Pub/Sub 拉取消息延迟:Java 客户端性能调优指南的详细内容,更多请关注php中文网其它相关文章!
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号