背景
本文是《Java 后端从小白到大神》修仙系列之框架学习,Spring Cloud 微服务实战系列的 第九篇
。Spring Cloud 是构建微服务架构的基石,拥有完整的服务治理生态,在云原生架构中广泛应用。本系列将从架构认知到实际工程,逐步构建一套企业级 Spring Cloud 微服务项目。若想详细学习请点击首篇博文开始,现在开始学习。
文章概览
消息驱动通信机制:Kafka + Spring Cloud Stream:
- 为什么选择消息驱动通信?场景 & 通信机制对比
- Spring Cloud Stream + Kafka 核心概念
- 幂等性机制:理论 + 数据库落地实现
- 消息重试机制:Spring Retry + 死信队列(DLT)
- 延迟队列实现:定时调度 + 延迟 Topic 模拟
- 实战 NotifyService:生产 + 消费 + 幂等 + 死信处理
1. 消息队列通信场景与对比
在微服务架构中,系统之间的通信不仅限于同步调用(如 HTTP、Feign),更多场景使用 消息队列 进行 异步解耦、削峰填谷 和 事件驱动。本篇将以 Kafka 为基础,结合 Spring Cloud Stream 框架,构建一套 消息驱动的微服务通信体系。
通信机制 |
特点 |
适用场景 |
HTTP REST |
同步阻塞,耦合性高 |
配置中心、权限校验 |
OpenFeign |
接口友好,同步依赖 |
服务内部数据调用 |
消息队列 MQ |
异步解耦,可靠传输 |
异步任务、通知推送、日志收集 |
为什么选 Kafka?
- 吞吐量高,低延迟,适合高并发场景
- 保证顺序消费,适合事件驱动
- Spring Cloud Stream 原生支持
2. Spring Cloud Stream 核心概念
Spring Cloud Stream 是 Spring 出品的 事件驱动编程模型,其架构:
- Binder:连接器(适配 Kafka、RabbitMQ 等)
- Producer:消息发送者
- Consumer:消息监听者
- MessageChannel:通道(logical topic)
- @EnableBinding / @StreamListener / Functional Binding
核心优势: 统一 API、简化开发、无需关心底层实现(只配置 binder)。
application.yaml 示例:
1
2
3
4
5
6
7
8
9
10
11
12
|
spring:
cloud:
stream:
bindings:
notify-out:
destination: notify-topic
notify-in:
destination: notify-topic
group: notify-group
kafka:
binder:
brokers: localhost:9092
|
3. 幂等性设计与重复消费处理
Kafka 默认采用 “至少一次投递",存在重复消费风险。
幂等处理策略:
手段 |
说明 |
消息 ID 唯一 |
每条消息生成 messageId,作为幂等主键 |
数据库约束 |
messageId 唯一索引,防止重复插入 |
Redis 去重 |
可用 SET 结构临时判断是否消费过 |
实现代码:
1
2
3
4
5
6
7
8
9
|
@Transactional
public void processMessage(NotifyMessage message) {
if (notifyRecordRepository.findByMessageId(message.getMessageId()).isPresent()) {
log.warn("重复消息,跳过处理: {}", message.getMessageId());
return;
}
// 正常业务处理
notifyRecordRepository.save(new NotifyRecord(...));
}
|
4. 消息失败重试机制 + 死信队列
Kafka 消费失败时,默认行为是跳过或重复消费,可借助 Spring Cloud Stream 提供的重试机制与 DLT(Dead Letter Topic)进行改进。
配置:application.yaml
1
2
3
4
5
6
7
8
9
10
|
spring:
cloud:
stream:
bindings:
notify-in:
destination: notify-topic
group: notify-group
consumer:
max-attempts: 3
back-off-initial-interval: 1000
|
当超过最大尝试次数后,消息将被发送到 notify-topic.notify-group.dlq
死信队列中。
死信消费监听器
1
2
3
4
|
@Bean
public Consumer<NotifyMessage> dlqListener() {
return msg -> log.error("死信队列消息:{}", msg);
}
|
5. 延迟队列机制实现
Kafka 原生不支持延迟投递,可采用如下方式:
方式一:数据库 + 定时轮询(推荐)
- 消息发送到延迟消息表,设置执行时间
- Spring 定时任务每隔 N 秒查询
- 满足延迟时间的消息转投 Kafka 主题执行
1
2
3
4
5
6
7
|
@Scheduled(fixedDelay = 5000)
public void scanDelayedMessages() {
List<NotifyMessage> messages = delayMessageRepository.findReadyToSend(LocalDateTime.now());
for (NotifyMessage msg : messages) {
streamBridge.send("notify-out", msg);
}
}
|
方式二:构造延迟 Topic + Scheduler 控制消费
- 利用 Topic 分区/时间戳模拟消费延迟
- 或引入外部中间件如
Kafka-Delay-Queue
(复杂)
6. NotifyService 项目实战
1
2
3
4
5
6
7
8
9
10
11
|
notify-service
├── controller/NotifyController.java // 消息生产者接口
├── stream/NotifySource.java // 生产者通道
├── stream/NotifyConsumer.java // 消息消费者函数
├── stream/NotifyConsumer.java // 消费者监听
├── dto/NotifyMessage.java // 消息结构
├── entity/NotifyRecord.java // 数据落地 + 幂等性
├── repository/NotifyRecordRepository.java // 消息记录存储
├── scheduler/DelayMessageScheduler.java // 延迟调度实现
├── resources/application.yaml
└── pom.xml
|
pom.xml 添加依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
</dependencies>
|
application.yaml 配置 Kafka binder
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
server:
port: 8087
spring:
application:
name: notify-service
cloud:
stream:
bindings:
notify-out-0:
destination: notify-topic
notify-in-0:
destination: notify-topic
group: notify-group
kafka:
binder:
brokers: localhost:9092
|
NotifySource.java 通道结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
package com.yutao.stream;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.stereotype.Component;
import lombok.RequiredArgsConstructor;
@Component
@RequiredArgsConstructor
public class NotifySource {
private final StreamBridge streamBridge;
public void send(NotifyMessage message) {
streamBridge.send("notify-out-0", message);
}
}
|
使用 StreamBridge
灵活发送消息,替代旧的 @Output 模式。
NotifyMessage.java 消息结构 DTO
1
2
3
4
5
6
7
8
|
@Data
@AllArgsConstructor
@NoArgsConstructor
public class NotifyMessage {
private String messageId; // 幂等唯一 ID(例如 UUID 或业务组合)
private Long userId;
private String content;
}
|
NotifyRecord.java 消息记录
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
@Entity
@Table(name = "notify_records", indexes = {
@Index(name = "idx_message_id", columnList = "messageId", unique = true)
})
@Data
@NoArgsConstructor
@AllArgsConstructor
public class NotifyRecord {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String messageId; // 幂等校验字段
private Long userId;
@Column(length = 1024)
private String content;
private String status; // e.g., "RECEIVED", "PROCESSED", "FAILED"
private LocalDateTime createdAt;
private LocalDateTime processedAt;
}
|
NotifyRecordRepository.java 消息保存
1
2
3
|
public interface NotifyRecordRepository extends JpaRepository<NotifyRecord, Long> {
Optional<NotifyRecord> findByMessageId(String messageId);
}
|
模拟生产者发送
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
@RestController
@RequestMapping("/notify")
@RequiredArgsConstructor
public class NotifyController {
private final NotifySource notifySource; // 之前定义的发送封装
@PostMapping("/send")
public String send(@RequestBody Map<String, Object> payload) {
String messageId = UUID.randomUUID().toString();
Long userId = Long.valueOf(payload.get("userId").toString());
String content = payload.get("content").toString();
NotifyMessage msg = new NotifyMessage(messageId, userId, content);
notifySource.send(msg);
return "sent";
}
}
|
NotifyConsumer.java 消息消费
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
@Component
@RequiredArgsConstructor
public class NotifyConsumer {
private static final Logger log = LoggerFactory.getLogger(NotifyConsumer.class);
private final NotifyRecordRepository notifyRecordRepository;
@Bean
public Consumer<NotifyMessage> notifyIn() {
return message -> {
log.info("收到通知消息: {}", message);
// 幂等检查 + 处理
try {
processMessage(message);
} catch (Exception e) {
log.error("处理通知消息失败, messageId={}, error={}", message.getMessageId(), e.getMessage(), e);
// 失败后可考虑写入失败表/重试机制
}
};
}
@Transactional
public void processMessage(NotifyMessage message) {
// 幂等:已有记录则跳过
String messageId = message.getMessageId();
if (messageId == null || messageId.isBlank()) {
throw new IllegalArgumentException("messageId 不能为空");
}
boolean exists = notifyRecordRepository.findByMessageId(messageId).isPresent();
if (exists) {
log.warn("重复消息,已处理过,messageId={}", messageId);
return;
}
// 保存为已接收
NotifyRecord record = new NotifyRecord();
record.setMessageId(messageId);
record.setUserId(message.getUserId());
record.setContent(message.getContent());
record.setStatus("PROCESSED");
record.setCreatedAt(LocalDateTime.now());
record.setProcessedAt(LocalDateTime.now());
notifyRecordRepository.save(record);
// 这里可以扩展:例如发送邮件/推送等后续逻辑
log.info("通知持久化成功, messageId={}, userId={}", messageId, message.getUserId());
}
}
|
DelayMessageScheduler.java 延迟调度实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
@Component
public class DelayMessageScheduler {
@Autowired private NotifyRecordRepository repository;
@Autowired private StreamBridge streamBridge;
@Scheduled(fixedDelay = 5000)
public void scan() {
List<NotifyRecord> messages = repository.findReadyToSend(LocalDateTime.now());
for (NotifyRecord record : messages) {
NotifyMessage msg = new NotifyMessage(record.getMessageId(), record.getContent(), LocalDateTime.now());
streamBridge.send("notify-out", msg);
}
}
}
|
总结
- Spring Cloud Stream 是微服务事件驱动的理想选择
- 使用 Kafka 实现高性能异步通信
- 结合 StreamBridge、函数式编程,让生产与消费更灵活
- 实际项目中,务必加上消息幂等、日志跟踪与异常处理机制