首页 > Java > java教程 > 正文

策略性测试Google Cloud Pub/Sub发布服务

碧海醫心
发布: 2025-11-22 17:15:05
原创
960人浏览过

策略性测试google cloud pub/sub发布服务

本文旨在解决在JUnit测试中直接模拟Google Cloud Pub/Sub客户端库(如`Publisher.newBuilder()`)时遇到的挑战。核心策略是引入一个抽象层(接口和实现),将外部服务调用封装起来,从而实现对业务逻辑的独立测试,并提升代码的可维护性和可测试性。

1. 问题背景:直接模拟Google Cloud Pub/Sub客户端的挑战

在编写单元测试时,我们通常希望隔离被测试代码与外部依赖(如数据库、网络服务、第三方库)。对于Google Cloud Pub/Sub的Java客户端库,直接模拟Publisher.newBuilder()等链式调用的静态或最终方法会遇到困难。这是因为Mockito等模拟框架主要针对接口或非最终类的方法进行模拟,而Publisher.Builder的设计模式使得其内部状态和构建过程难以在不修改源代码的情况下进行拦截和模拟。

考虑以下原始代码片段,它直接使用了Pub/Sub客户端库来发布消息:

public String publishJSON(String json, AppConfig config) throws InterruptedException, IOException, ExecutionException {             
    log.info(" Publishing payload to: "+config.getTopicId());   
    TopicName topicName=TopicName.of(config.getPubsubProjectId(),config.getTopicId());
    Publisher publisher=null;
    try {
         publisher =
                Publisher.newBuilder(topicName)
                    .build();
          ByteString data = ByteString.copyFromUtf8(json);
          PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
          ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
          String messageId = messageIdFuture.get();
          log.info("Published message ID: " + messageId);
       return messageId;
    } catch (ExecutionException e) {
        log.error("Error while publishing messsage" + e.getMessage());
        throw e;
    } catch (IOException e) {
        log.error( "PubSub exception "+ e.getMessage());
        throw e;
    } catch (InterruptedException e) {
        log.error("Connection making exception for PubSub" + e.getMessage());
        throw e;
    } catch (Exception e) {
        log.error( "publishJSON Error : "+ e.getMessage());
        throw e;    
    }
    finally {
        if (publisher != null) {
            // When finished with the publisher, shutdown to free up resources.
            publisher.shutdown();
            publisher.awaitTermination(1, TimeUnit.MINUTES);
        }
    }
}
登录后复制

这段代码直接创建并管理Publisher实例。在单元测试中,我们不希望真正连接到Pub/Sub服务,因此需要一种方式来“假装”发布成功或失败。

2. 解决方案:引入抽象层与依赖注入

为了有效测试依赖于外部服务的代码,最佳实践是引入一个抽象层。这意味着我们将直接调用外部库的代码封装在一个独立的类中,并通过接口对其进行抽象。然后,我们的业务逻辑代码将依赖于这个接口,而不是具体的实现。在测试时,我们可以轻松地模拟这个接口。

2.1 定义Pub/Sub发布服务接口

首先,定义一个接口来抽象Pub/Sub的发布操作。

Smart Picture
Smart Picture

Smart Picture 智能高效的图片处理工具

Smart Picture 77
查看详情 Smart Picture
package com.example.pubsub;

import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.ExecutionException;

/**
 * PubSub消息发布服务的接口。
 * 定义了向PubSub主题发布消息的核心操作。
 */
public interface PubSubPublisherService {

    /**
     * 发布一个JSON字符串作为PubSub消息。
     *
     * @param jsonPayload 要发布的JSON字符串。
     * @return 发布成功的消息ID。
     * @throws PubSubPublishException 如果发布过程中发生错误。
     */
    String publishJsonMessage(String jsonPayload) throws PubSubPublishException;

    /**
     * 发布一个字节数组作为PubSub消息。
     *
     * @param data 要发布的字节数据。
     * @return 发布成功的消息ID。
     * @throws PubSubPublishException 如果发布过程中发生错误。
     */
    String publishBytesMessage(byte[] data) throws PubSubPublishException;
}
登录后复制

为了更好地处理发布过程中可能出现的各种异常,我们定义一个自定义异常类:

package com.example.pubsub;

/**
 * PubSub发布操作的自定义异常。
 */
public class PubSubPublishException extends Exception {
    public PubSubPublishException(String message) {
        super(message);
    }

    public PubSubPublishException(String message, Throwable cause) {
        super(message, cause);
    }
}
登录后复制

2.2 实现Pub/Sub发布服务

接下来,创建接口的具体实现,它将包含实际的Google Cloud Pub/Sub客户端调用逻辑。

package com.example.pubsub;

import com.google.api.core.ApiFuture;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

/**
 * 默认的PubSubPublisherService实现,直接与Google Cloud Pub/Sub交互。
 */
public class DefaultPubSubPublisherService implements PubSubPublisherService {

    private static final Logger log = LoggerFactory.getLogger(DefaultPubSubPublisherService.class);
    private final TopicName topicName;

    /**
     * 构造函数,初始化PubSub发布服务的配置。
     *
     * @param projectId Google Cloud项目ID。
     * @param topicId PubSub主题ID。
     */
    public DefaultPubSubPublisherService(String projectId, String topicId) {
        this.topicName = TopicName.of(projectId, topicId);
    }

    @Override
    public String publishJsonMessage(String jsonPayload) throws PubSubPublishException {
        if (jsonPayload == null || jsonPayload.isEmpty()) {
            throw new IllegalArgumentException("JSON payload cannot be null or empty.");
        }
        return publishBytesMessage(ByteString.copyFromUtf8(jsonPayload).toByteArray());
    }

    @Override
    public String publishBytesMessage(byte[] data) throws PubSubPublishException {
        Publisher publisher = null;
        try {
            log.info("Attempting to publish payload to topic: {}", topicName.toString());
            publisher = Publisher.newBuilder(topicName).build();
            PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(ByteString.copyFrom(data)).build();
            ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
            String messageId = messageIdFuture.get(); // 阻塞等待消息发布完成
            log.info("Published message with ID: {}", messageId);
            return messageId;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt(); // 恢复中断状态
            log.error("PubSub publish interrupted for topic {}: {}", topicName.toString(), e.getMessage());
            throw new PubSubPublishException("PubSub publish operation interrupted.", e);
        } catch (ExecutionException e) {
            log.error("Error during PubSub message publishing for topic {}: {}", topicName.toString(), e.getMessage());
            throw new PubSubPublishException("Failed to publish message to PubSub.", e);
        } catch (IOException e) {
            log.error("PubSub publisher creation failed for topic {}: {}", topicName.toString(), e.getMessage());
            throw new PubSubPublishException("Failed to create PubSub publisher.", e);
        } catch (Exception e) { // 捕获其他所有未预期的异常
            log.error("Unexpected error during PubSub publishing for topic {}: {}", topicName.toString(), e.getMessage());
            throw new PubSubPublishException("An unexpected error occurred during PubSub publishing.", e);
        } finally {
            if (publisher != null) {
                try {
                    // 确保Publisher资源被正确关闭
                    publisher.shutdown();
                    publisher.awaitTermination(1, TimeUnit.MINUTES);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt(); // 恢复中断状态
                    log.warn("PubSub publisher shutdown interrupted for topic {}.", topicName.toString());
                }
            }
        }
    }
}
登录后复制

2.3 修改业务服务类以使用接口

现在,修改你的业务逻辑服务类,使其通过构造函数注入PubSubPublisherService接口的实例。

package com.example.service;

import com.example.config.AppConfig;
import com.example.pubsub.PubSubPublishException;
import com.example.pubsub.PubSubPublisherService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 示例业务服务类,负责处理数据并将其发布到PubSub。
 */
public class MyBusinessService {

    private static final Logger log = LoggerFactory.getLogger(MyBusinessService.class);
    private final PubSubPublisherService pubSubPublisherService;
    private final AppConfig appConfig; // 假设AppConfig包含项目和主题ID

    /**
     * 构造函数,通过依赖注入接收PubSubPublisherService和AppConfig。
     *
     * @param pubSubPublisherService PubSub发布服务的实例。
     * @param appConfig 应用程序配置。
     */
    public MyBusinessService(PubSubPublisherService pubSubPublisherService, AppConfig appConfig) {
        this.pubSubPublisherService = pubSubPublisherService;
        this.appConfig = appConfig;
        // 在实际应用中,DefaultPubSubPublisherService的实例化可能在DI框架中完成
        // 或者在这里根据appConfig创建DefaultPubSubPublisherService实例
        // 比如:this.pubSubPublisherService = new DefaultPubSubPublisherService(appConfig.getPubsubProjectId(), appConfig.getTopicId());
    }

    /**
     * 处理并发布JSON数据到PubSub。
     *
     * @param jsonInput 待处理和发布的JSON字符串。
     * @return 发布成功的消息ID。
     * @throws PubSubPublishException 如果PubSub发布失败。
     */
    public String processAndPublishData(String jsonInput) throws PubSubPublishException {
        // 这里可以包含任何业务逻辑,例如数据验证、转换等
        log.info("Processing data for PubSub publication...");
        // 假设这里有一些业务逻辑处理,然后调用PubSub服务
        String messageId = pubSubPublisherService.publishJsonMessage(jsonInput);
        log.info("Data successfully processed and published with ID: {}", messageId);
        return messageId;
    }

    // 假设AppConfig类如下
    public static class AppConfig {
        private String pubsubProjectId;
        private String topicId;

        public AppConfig(String pubsubProjectId, String topicId) {
            this.pubsubProjectId = pubsubProjectId;
            this.topicId = topicId;
        }

        public String getPubsubProjectId() { return pubsubProjectId; }
        public String getTopicId() { return topicId; }
    }
}
登录后复制

3. 编写JUnit测试

现在,我们可以轻松地为MyBusinessService编写单元测试,而无需实际连接到Pub/Sub。我们将模拟PubSubPublisherService接口。

package com.example.service;

import com.example.pubsub.PubSubPublishException;
import com.example.pubsub.PubSubPublisherService;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.*;

class MyBusinessServiceTest {

    @Mock
    private PubSubPublisherService pubSubPublisherService; // 模拟接口

    @Mock
    private MyBusinessService.AppConfig appConfig; // 模拟配置类

    @InjectMocks
    private MyBusinessService myBusinessService; // 注入被测试的服务

    @BeforeEach
    void setUp() {
        // 初始化Mockito注解,将mock对象注入到myBusinessService中
        MockitoAnnotations.openMocks(this);
        // 模拟AppConfig的行为,如果MyBusinessService的构造函数需要
        when(appConfig.getPubsubProjectId()).thenReturn("test-project");
        when(appConfig.getTopicId()).thenReturn("test-topic");
        // 注意:如果MyBusinessService的构造函数中手动创建了DefaultPubSubPublisherService,
        // 则@InjectMocks可能无法按预期工作,需要手动实例化MyBusinessService并传入mock对象。
        // 在当前设计中,MyBusinessService直接依赖PubSubPublisherService接口,所以@InjectMocks是有效的。
    }

    @Test
    void testProcessAndPublishData_success() throws PubSubPublishException {
        String testJsonPayload = "{\"data\":\"test_message\"}";
        String expectedMessageId = "mock-message-id-123";

        // 配置模拟对象的行为:当调用publishJsonMessage时,返回预期的消息ID
        when(pubSubPublisherService.publishJsonMessage(testJsonPayload))
                .thenReturn(expectedMessageId);

        // 调用被测试方法
        String actualMessageId = myBusinessService.processAndPublishData(testJsonPayload);

        // 验证结果
        assertEquals(expectedMessageId, actualMessageId);
        // 验证pubSubPublisherService的publishJsonMessage方法是否被调用了一次,且参数正确
        verify(pubSubPublisherService, times(1)).publishJsonMessage(testJsonPayload);
    }

    @Test
    void testProcessAndPublishData_publishFailure() throws PubSubPublishException {
        String testJsonPayload = "{\"data\":\"error_message\"}";

        // 配置模拟对象的行为:当调用publishJsonMessage时,抛出自定义异常
        when(pubSubPublisherService.publishJsonMessage(testJsonPayload))
                .thenThrow(new PubSubPublishException("Mock PubSub publish error"));

        // 验证被测试方法是否抛出了预期的异常
        assertThrows(PubSubPublishException.class, () -> {
            myBusinessService.processAndPublishData(testJsonPayload);
        });

        // 验证pubSubPublisherService的publishJsonMessage方法是否被调用了一次
        verify(pubSubPublisherService, times(1)).publishJsonMessage(testJsonPayload);
    }

    @Test
    void testProcessAndPublishData_emptyPayload() throws PubSubPublishException {
        String emptyJsonPayload = "";

        // 如果业务逻辑层有对空数据的处理,可以在这里测试
        // 假设pubSubPublisherService在接收到空字符串时会抛出IllegalArgumentException
        when(pubSubPublisherService.publishJsonMessage(emptyJsonPayload))
            .thenThrow(new IllegalArgumentException("JSON payload cannot be null or empty."));

        assertThrows(IllegalArgumentException.class, () -> {
            myBusinessService.processAndPublishData(emptyJsonPayload);
        });

        verify(pubSubPublisherService, times(1)).publishJsonMessage(emptyJsonPayload);
    }
}
登录后复制

4. 注意事项与总结

  • 依赖注入的重要性: 通过依赖注入(Dependency Injection, DI),我们的业务服务不再直接创建外部服务的实例,而是通过构造函数或setter方法接收它们。这使得测试替换真实依赖变得简单。
  • 模拟粒度: 模拟应该发生在系统边界。对于外部服务,我们应该模拟其接口,而不是尝试模拟其内部的复杂实现细节(如Publisher.Builder)。
  • 错误处理: 封装层(DefaultPubSubPublisherService)负责捕获并转换底层的特定异常(如IOException, ExecutionException)为更具业务含义的自定义异常(PubSubPublishException),这有助于提高业务逻辑层的可读性和错误处理的统一性。
  • 资源管理: 在DefaultPubSubPublisherService中,务必在finally块中正确关闭Publisher资源,调用publisher.shutdown()和publisher.awaitTermination(),以避免资源泄露。
  • 集成测试: 尽管单元测试非常重要,但它不能替代集成测试。在某些情况下,你可能还需要编写集成测试来验证DefaultPubSubPublisherService与真实的Google Cloud Pub/Sub服务是否能正确交互。
  • Spring Boot集成: 如果你使用Spring Boot等框架,可以利用其DI容器(如Spring IoC)来管理PubSubPublisherService的实例。例如,将DefaultPubSubPublisherService声明为一个@Service或@Component,并通过@Autowired注入到MyBusinessService中。

通过采用这种“封装外部服务”和“依赖注入”的策略,我们能够有效地对依赖于Google Cloud Pub/Sub的服务进行单元测试,从而提高代码质量、减少耦合,并确保业务逻辑的正确性。

以上就是策略性测试Google Cloud Pub/Sub发布服务的详细内容,更多请关注php中文网其它相关文章!

最佳 Windows 性能的顶级免费优化软件
最佳 Windows 性能的顶级免费优化软件

每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。

下载
来源:php中文网
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
最新问题
开源免费商场系统广告
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板
关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新 English
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送
PHP中文网APP
随时随地碎片化学习

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号