
本文旨在解决在JUnit测试中直接模拟Google Cloud Pub/Sub客户端库(如`Publisher.newBuilder()`)时遇到的挑战。核心策略是引入一个抽象层(接口和实现),将外部服务调用封装起来,从而实现对业务逻辑的独立测试,并提升代码的可维护性和可测试性。
在编写单元测试时,我们通常希望隔离被测试代码与外部依赖(如数据库、网络服务、第三方库)。对于Google Cloud Pub/Sub的Java客户端库,直接模拟Publisher.newBuilder()等链式调用的静态或最终方法会遇到困难。这是因为Mockito等模拟框架主要针对接口或非最终类的方法进行模拟,而Publisher.Builder的设计模式使得其内部状态和构建过程难以在不修改源代码的情况下进行拦截和模拟。
考虑以下原始代码片段,它直接使用了Pub/Sub客户端库来发布消息:
public String publishJSON(String json, AppConfig config) throws InterruptedException, IOException, ExecutionException {
log.info(" Publishing payload to: "+config.getTopicId());
TopicName topicName=TopicName.of(config.getPubsubProjectId(),config.getTopicId());
Publisher publisher=null;
try {
publisher =
Publisher.newBuilder(topicName)
.build();
ByteString data = ByteString.copyFromUtf8(json);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
String messageId = messageIdFuture.get();
log.info("Published message ID: " + messageId);
return messageId;
} catch (ExecutionException e) {
log.error("Error while publishing messsage" + e.getMessage());
throw e;
} catch (IOException e) {
log.error( "PubSub exception "+ e.getMessage());
throw e;
} catch (InterruptedException e) {
log.error("Connection making exception for PubSub" + e.getMessage());
throw e;
} catch (Exception e) {
log.error( "publishJSON Error : "+ e.getMessage());
throw e;
}
finally {
if (publisher != null) {
// When finished with the publisher, shutdown to free up resources.
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}
}这段代码直接创建并管理Publisher实例。在单元测试中,我们不希望真正连接到Pub/Sub服务,因此需要一种方式来“假装”发布成功或失败。
为了有效测试依赖于外部服务的代码,最佳实践是引入一个抽象层。这意味着我们将直接调用外部库的代码封装在一个独立的类中,并通过接口对其进行抽象。然后,我们的业务逻辑代码将依赖于这个接口,而不是具体的实现。在测试时,我们可以轻松地模拟这个接口。
首先,定义一个接口来抽象Pub/Sub的发布操作。
package com.example.pubsub;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.ExecutionException;
/**
* PubSub消息发布服务的接口。
* 定义了向PubSub主题发布消息的核心操作。
*/
public interface PubSubPublisherService {
/**
* 发布一个JSON字符串作为PubSub消息。
*
* @param jsonPayload 要发布的JSON字符串。
* @return 发布成功的消息ID。
* @throws PubSubPublishException 如果发布过程中发生错误。
*/
String publishJsonMessage(String jsonPayload) throws PubSubPublishException;
/**
* 发布一个字节数组作为PubSub消息。
*
* @param data 要发布的字节数据。
* @return 发布成功的消息ID。
* @throws PubSubPublishException 如果发布过程中发生错误。
*/
String publishBytesMessage(byte[] data) throws PubSubPublishException;
}为了更好地处理发布过程中可能出现的各种异常,我们定义一个自定义异常类:
package com.example.pubsub;
/**
* PubSub发布操作的自定义异常。
*/
public class PubSubPublishException extends Exception {
public PubSubPublishException(String message) {
super(message);
}
public PubSubPublishException(String message, Throwable cause) {
super(message, cause);
}
}接下来,创建接口的具体实现,它将包含实际的Google Cloud Pub/Sub客户端调用逻辑。
package com.example.pubsub;
import com.google.api.core.ApiFuture;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
* 默认的PubSubPublisherService实现,直接与Google Cloud Pub/Sub交互。
*/
public class DefaultPubSubPublisherService implements PubSubPublisherService {
private static final Logger log = LoggerFactory.getLogger(DefaultPubSubPublisherService.class);
private final TopicName topicName;
/**
* 构造函数,初始化PubSub发布服务的配置。
*
* @param projectId Google Cloud项目ID。
* @param topicId PubSub主题ID。
*/
public DefaultPubSubPublisherService(String projectId, String topicId) {
this.topicName = TopicName.of(projectId, topicId);
}
@Override
public String publishJsonMessage(String jsonPayload) throws PubSubPublishException {
if (jsonPayload == null || jsonPayload.isEmpty()) {
throw new IllegalArgumentException("JSON payload cannot be null or empty.");
}
return publishBytesMessage(ByteString.copyFromUtf8(jsonPayload).toByteArray());
}
@Override
public String publishBytesMessage(byte[] data) throws PubSubPublishException {
Publisher publisher = null;
try {
log.info("Attempting to publish payload to topic: {}", topicName.toString());
publisher = Publisher.newBuilder(topicName).build();
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(ByteString.copyFrom(data)).build();
ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
String messageId = messageIdFuture.get(); // 阻塞等待消息发布完成
log.info("Published message with ID: {}", messageId);
return messageId;
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // 恢复中断状态
log.error("PubSub publish interrupted for topic {}: {}", topicName.toString(), e.getMessage());
throw new PubSubPublishException("PubSub publish operation interrupted.", e);
} catch (ExecutionException e) {
log.error("Error during PubSub message publishing for topic {}: {}", topicName.toString(), e.getMessage());
throw new PubSubPublishException("Failed to publish message to PubSub.", e);
} catch (IOException e) {
log.error("PubSub publisher creation failed for topic {}: {}", topicName.toString(), e.getMessage());
throw new PubSubPublishException("Failed to create PubSub publisher.", e);
} catch (Exception e) { // 捕获其他所有未预期的异常
log.error("Unexpected error during PubSub publishing for topic {}: {}", topicName.toString(), e.getMessage());
throw new PubSubPublishException("An unexpected error occurred during PubSub publishing.", e);
} finally {
if (publisher != null) {
try {
// 确保Publisher资源被正确关闭
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // 恢复中断状态
log.warn("PubSub publisher shutdown interrupted for topic {}.", topicName.toString());
}
}
}
}
}现在,修改你的业务逻辑服务类,使其通过构造函数注入PubSubPublisherService接口的实例。
package com.example.service;
import com.example.config.AppConfig;
import com.example.pubsub.PubSubPublishException;
import com.example.pubsub.PubSubPublisherService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 示例业务服务类,负责处理数据并将其发布到PubSub。
*/
public class MyBusinessService {
private static final Logger log = LoggerFactory.getLogger(MyBusinessService.class);
private final PubSubPublisherService pubSubPublisherService;
private final AppConfig appConfig; // 假设AppConfig包含项目和主题ID
/**
* 构造函数,通过依赖注入接收PubSubPublisherService和AppConfig。
*
* @param pubSubPublisherService PubSub发布服务的实例。
* @param appConfig 应用程序配置。
*/
public MyBusinessService(PubSubPublisherService pubSubPublisherService, AppConfig appConfig) {
this.pubSubPublisherService = pubSubPublisherService;
this.appConfig = appConfig;
// 在实际应用中,DefaultPubSubPublisherService的实例化可能在DI框架中完成
// 或者在这里根据appConfig创建DefaultPubSubPublisherService实例
// 比如:this.pubSubPublisherService = new DefaultPubSubPublisherService(appConfig.getPubsubProjectId(), appConfig.getTopicId());
}
/**
* 处理并发布JSON数据到PubSub。
*
* @param jsonInput 待处理和发布的JSON字符串。
* @return 发布成功的消息ID。
* @throws PubSubPublishException 如果PubSub发布失败。
*/
public String processAndPublishData(String jsonInput) throws PubSubPublishException {
// 这里可以包含任何业务逻辑,例如数据验证、转换等
log.info("Processing data for PubSub publication...");
// 假设这里有一些业务逻辑处理,然后调用PubSub服务
String messageId = pubSubPublisherService.publishJsonMessage(jsonInput);
log.info("Data successfully processed and published with ID: {}", messageId);
return messageId;
}
// 假设AppConfig类如下
public static class AppConfig {
private String pubsubProjectId;
private String topicId;
public AppConfig(String pubsubProjectId, String topicId) {
this.pubsubProjectId = pubsubProjectId;
this.topicId = topicId;
}
public String getPubsubProjectId() { return pubsubProjectId; }
public String getTopicId() { return topicId; }
}
}现在,我们可以轻松地为MyBusinessService编写单元测试,而无需实际连接到Pub/Sub。我们将模拟PubSubPublisherService接口。
package com.example.service;
import com.example.pubsub.PubSubPublishException;
import com.example.pubsub.PubSubPublisherService;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.*;
class MyBusinessServiceTest {
@Mock
private PubSubPublisherService pubSubPublisherService; // 模拟接口
@Mock
private MyBusinessService.AppConfig appConfig; // 模拟配置类
@InjectMocks
private MyBusinessService myBusinessService; // 注入被测试的服务
@BeforeEach
void setUp() {
// 初始化Mockito注解,将mock对象注入到myBusinessService中
MockitoAnnotations.openMocks(this);
// 模拟AppConfig的行为,如果MyBusinessService的构造函数需要
when(appConfig.getPubsubProjectId()).thenReturn("test-project");
when(appConfig.getTopicId()).thenReturn("test-topic");
// 注意:如果MyBusinessService的构造函数中手动创建了DefaultPubSubPublisherService,
// 则@InjectMocks可能无法按预期工作,需要手动实例化MyBusinessService并传入mock对象。
// 在当前设计中,MyBusinessService直接依赖PubSubPublisherService接口,所以@InjectMocks是有效的。
}
@Test
void testProcessAndPublishData_success() throws PubSubPublishException {
String testJsonPayload = "{\"data\":\"test_message\"}";
String expectedMessageId = "mock-message-id-123";
// 配置模拟对象的行为:当调用publishJsonMessage时,返回预期的消息ID
when(pubSubPublisherService.publishJsonMessage(testJsonPayload))
.thenReturn(expectedMessageId);
// 调用被测试方法
String actualMessageId = myBusinessService.processAndPublishData(testJsonPayload);
// 验证结果
assertEquals(expectedMessageId, actualMessageId);
// 验证pubSubPublisherService的publishJsonMessage方法是否被调用了一次,且参数正确
verify(pubSubPublisherService, times(1)).publishJsonMessage(testJsonPayload);
}
@Test
void testProcessAndPublishData_publishFailure() throws PubSubPublishException {
String testJsonPayload = "{\"data\":\"error_message\"}";
// 配置模拟对象的行为:当调用publishJsonMessage时,抛出自定义异常
when(pubSubPublisherService.publishJsonMessage(testJsonPayload))
.thenThrow(new PubSubPublishException("Mock PubSub publish error"));
// 验证被测试方法是否抛出了预期的异常
assertThrows(PubSubPublishException.class, () -> {
myBusinessService.processAndPublishData(testJsonPayload);
});
// 验证pubSubPublisherService的publishJsonMessage方法是否被调用了一次
verify(pubSubPublisherService, times(1)).publishJsonMessage(testJsonPayload);
}
@Test
void testProcessAndPublishData_emptyPayload() throws PubSubPublishException {
String emptyJsonPayload = "";
// 如果业务逻辑层有对空数据的处理,可以在这里测试
// 假设pubSubPublisherService在接收到空字符串时会抛出IllegalArgumentException
when(pubSubPublisherService.publishJsonMessage(emptyJsonPayload))
.thenThrow(new IllegalArgumentException("JSON payload cannot be null or empty."));
assertThrows(IllegalArgumentException.class, () -> {
myBusinessService.processAndPublishData(emptyJsonPayload);
});
verify(pubSubPublisherService, times(1)).publishJsonMessage(emptyJsonPayload);
}
}通过采用这种“封装外部服务”和“依赖注入”的策略,我们能够有效地对依赖于Google Cloud Pub/Sub的服务进行单元测试,从而提高代码质量、减少耦合,并确保业务逻辑的正确性。
以上就是策略性测试Google Cloud Pub/Sub发布服务的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号