创建一个controller包并编写一个测试类用于发送消息
package com.my.kafka.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class HelloController {
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
@GetMapping("hello")
public String helloProducer(){
kafkaTemplate.send("my-topic","Hello~");
return "ok";
}
}编写测试类用于接收消息:
package com.my.kafka.listener;
import org.junit.platform.commons.util.StringUtils;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class HelloListener {
@KafkaListener(topics = "my-topic")
public void helloListener(String message) {
if(StringUtils.isNotBlank(message)) {
System.out.println(message);
}
}
}打开浏览器输入localhost:9991/hello,然后到控制台查看消息,可以看到成功消息监听到并且进行了消费。

目前springboot整合后的kafka,因为序列化器是StringSerializer,这个时候如果需要传递对象可以有两种方式:
方式一:可以自定义序列化器,对象类型众多,这种方式通用性不强,这里不做介绍。
方式二:可以把要传递的对象进行转json字符串,接收消息后再转为对象即可,本项目采用这种方式。
@GetMapping("hello")
public String helloProducer(){
User user = new User();
user.setName("赵四");
user.setAge(20);
kafkaTemplate.send("my-topic", JSON.toJSONString(user));
return "ok";
}
可以看到成功接收都对象参数,后期要使用该对象只需要将其转换成User对象即可。
发布文章之后,可能会由于文章出现某些错误或者其他原因,我们会在文章管理端实现文章的上下架功能(见下图),也即当管理端实现对文章下架之后移动端将不会再展示该文章,只有该文章重新被上架之后才能在移动端看到该文章信息。


后端接收到前端传过来的参数之后要先做一个校验,参数不为空才能继续往下执行,首先应该根据前端传过来的文章id(自媒体端文章id)查询自媒体数据库的文章信息并判断该文章是否已是发布状态,因为只有审核成功并成功发布了的文章才能进行上下架操作。自媒体端微服务对文章上下架状态进行修改之后便可以向Kafka发送一条消息,该消息为Map对象,里面存储的数据为移动端的文章id以及前端传过来的上下架参数enable,当然要将该Map对象转换成JSON字符串才能进行发送。
文章微服务监听到Kafka发送过来的消息之后将JSON字符串转换成Map对象之后再获取相关参数对移动端文章的上下架状态进行修改。
<!-- kafkfa -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>package com.my.common.constans;
public class WmNewsMessageConstants {
public static final String WM_NEWS_UP_OR_DOWN_TOPIC="wm.news.up.or.down.topic";
}由于我是用Nacos来作为注册中心,所以配置信息放置在Nacos上面即可。
(1)自媒体端配置
spring:
kafka:
bootstrap-servers: 4.234.52.122:9092
producer:
retries: 10
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer(2)移动端配置
spring:
kafka:
bootstrap-servers: 4.234.52.122:9092
consumer:
group-id: ${spring.application.name}-test
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
/**
* 文章下架或上架
* @param id
* @param enable
* @return
*/
@Override
public ResponseResult downOrUp(Integer id,Integer enable) {
log.info("执行文章上下架操作...");
if(id == null || enable == null) {
return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
}
//根据id获取文章
WmNews news = getById(id);
if(news == null) {
return ResponseResult.errorResult(AppHttpCodeEnum.DATA_NOT_EXIST,"文章信息不存在");
}
//获取当前文章状态
Short status = news.getStatus();
if(!status.equals(WmNews.Status.PUBLISHED.getCode())) {
return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,"文章非发布状态,不能上下架");
}
//更改文章状态
news.setEnable(enable.shortValue());
updateById(news);
log.info("更改文章上架状态{}-->{}",status,news.getEnable());
//发送消息到Kafka
Map<String, Object> map = new HashMap<>();
map.put("articleId",news.getArticleId());
map.put("enable",enable.shortValue());
kafkaTemplate.send(WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC,JSON.toJSONString(map));
log.info("发送消息到Kafka...");
return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}(1)设置监听器
package com.my.article.listener;
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.my.article.service.ApArticleService;
import com.my.common.constans.WmNewsMessageConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.kafka.annotation.KafkaListener;
@Slf4j
@Component
public class EnableListener {
@Autowired
private ApArticleService apArticleService;
@KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC)
public void downOrUp(String message) {
if(StringUtils.isNotBlank(message)) {
log.info("监听到消息{}",message);
apArticleService.downOrUp(message);
}
}
}(2)获取消息并修改文章状态
/**
* 文章上下架
* @param message
* @return
*/
@Override
public ResponseResult downOrUp(String message) {
Map map = JSON.parseObject(message, Map.class);
//获取文章id
Long articleId = (Long) map.get("articleId");
//获取文章待修改状态
Integer enable = (Integer) map.get("enable");
//查询文章配置
ApArticleConfig apArticleConfig = apArticleConfigMapper.selectOne
(Wrappers.<ApArticleConfig>lambdaQuery().eq(ApArticleConfig::getArticleId, articleId));
if(apArticleConfig != null) {
//上架
if(enable == 1) {
log.info("文章重新上架");
apArticleConfig.setIsDown(false);
apArticleConfigMapper.updateById(apArticleConfig);
}
//下架
if(enable == 0) {
log.info("文章下架");
apArticleConfig.setIsDown(true);
apArticleConfigMapper.updateById(apArticleConfig);
}
}
else {
throw new RuntimeException("文章信息不存在");
}
return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}以上就是Springboot微服务项目整合Kafka如何实现文章上下架功能的详细内容,更多请关注php中文网其它相关文章!
Kafka Eagle是一款结合了目前大数据Kafka监控工具的特点,重新研发的一块开源免费的Kafka集群优秀的监控工具。它可以非常方便的监控生产环境中的offset、lag变化、partition分布、owner等,有需要的小伙伴快来保存下载体验吧!
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号