pulsar 是一种高效的服务器到服务器消息系统,具有多租户和高性能等特点,最初由 yahoo 开发,现由 apache 软件基金会管理。它是 apache 的顶级项目,定位为下一代云原生分布式消息流平台,融合了消息传递、存储和轻量级函数计算功能,采用计算与存储分离的架构设计,支持多租户、持久化存储、跨区域数据复制,具备强一致性、高吞吐、低延迟和高扩展性等流数据存储特性,被视为云原生时代实时消息流传输、存储和计算的理想解决方案。
Pulsar 的特性包括:
Pulsar 的架构主要包括以下组件:
Pulsar 支持四种订阅模式:
下载和安装 Pulsar 2.9.1 版本后,可以在 Linux 服务器上解压并启动单机版 Pulsar。使用命令行可以启动和终止 Pulsar 服务。
在 Spring Boot 中集成 Pulsar 需要以下步骤:
引入 Maven 依赖:
<dependency>
<groupId>io.github.majusko</groupId>
<artifactId>pulsar-java-spring-boot-starter</artifactId>
<version>1.1.0</version>
</dependency>配置 application.yml:
pulsar: service-url: pulsar://192.168.0.105:6650
创建 Pulsar 配置类:
@Configuration
public class PulsarConfig {
@Bean
public ProducerFactory producerFactory() {
return new ProducerFactory().addProducer("testTopic", String.class);
}
}定义 Topic 名称常量类:
public class TopicName {
private TopicName(){}
public static final String TEST_TOPIC = "testTopic";
}创建消息生产者类:
@Component
public class PulsarProducer<T> {
@Resource
private PulsarTemplate<T> template;
public void send(String topic, T message) {
try {
template.send(topic, message);
} catch (PulsarClientException e) {
e.printStackTrace();
}
}
}创建消息消费者类:
@Component
public class TestTopicPulsarConsumer {
private static final Logger log = LoggerFactory.getLogger(TestTopicPulsarConsumer.class);
@PulsarConsumer(topic = TopicName.TEST_TOPIC, subscriptionType = SubscriptionType.Shared, clazz = String.class)
public void consume(String message) {
log.info("PulsarRealConsumer content:{}", message);
}
}创建 PulsarController 测试发送消息:
@RestController
@RequestMapping("/pulsar")
public class PulsarController {
@Resource
private PulsarProducer<String> pulsarProducer;
@PostMapping(value = "/sendMessage")
public CommonResponse<String> sendMessage(@RequestParam(name = "message") String message) {
pulsarProducer.send(TopicName.TEST_TOPIC, message);
return CommonResponse.success("done");
}
}定义公共响应体类:
public class CommonResponse<T> {
private String code;
private Boolean success;
private T data;
public static <T> CommonResponse<T> success(T t){
return new CommonResponse<>("200",true,t);
}
public CommonResponse(String code, Boolean success, T data) {
this.code = code;
this.success = success;
this.data = data;
}
//getter、setter方法
}启动项目后,可以使用 Postman 测试消息发送和接收功能。








以上就是Pulsar中间件入门学习的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号