背景

本文是《Java 后端从小白到大神》修仙系列之框架学习,Spring Cloud 微服务实战系列的 第九篇。Spring Cloud 是构建微服务架构的基石,拥有完整的服务治理生态,在云原生架构中广泛应用。本系列将从架构认知到实际工程,逐步构建一套企业级 Spring Cloud 微服务项目。若想详细学习请点击首篇博文开始,现在开始学习。

文章概览

消息驱动通信机制:Kafka + Spring Cloud Stream:

  1. 为什么选择消息驱动通信?场景 & 通信机制对比
  2. Spring Cloud Stream + Kafka 核心概念
  3. 幂等性机制:理论 + 数据库落地实现
  4. 消息重试机制:Spring Retry + 死信队列(DLT)
  5. 延迟队列实现:定时调度 + 延迟 Topic 模拟
  6. 实战 NotifyService:生产 + 消费 + 幂等 + 死信处理

1. 消息队列通信场景与对比

在微服务架构中,系统之间的通信不仅限于同步调用(如 HTTP、Feign),更多场景使用 消息队列 进行 异步解耦削峰填谷事件驱动。本篇将以 Kafka 为基础,结合 Spring Cloud Stream 框架,构建一套 消息驱动的微服务通信体系

通信机制 特点 适用场景
HTTP REST 同步阻塞,耦合性高 配置中心、权限校验
OpenFeign 接口友好,同步依赖 服务内部数据调用
消息队列 MQ 异步解耦,可靠传输 异步任务、通知推送、日志收集

为什么选 Kafka?

  • 吞吐量高,低延迟,适合高并发场景
  • 保证顺序消费,适合事件驱动
  • Spring Cloud Stream 原生支持

2. Spring Cloud Stream 核心概念

Spring Cloud Stream 是 Spring 出品的 事件驱动编程模型,其架构:

  • Binder:连接器(适配 Kafka、RabbitMQ 等)
  • Producer:消息发送者
  • Consumer:消息监听者
  • MessageChannel:通道(logical topic)
  • @EnableBinding / @StreamListener / Functional Binding

核心优势: 统一 API、简化开发、无需关心底层实现(只配置 binder)。

application.yaml 示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
spring:
  cloud:
    stream:
      bindings:
        notify-out:
          destination: notify-topic
        notify-in:
          destination: notify-topic
          group: notify-group
      kafka:
        binder:
          brokers: localhost:9092

3. 幂等性设计与重复消费处理

Kafka 默认采用 “至少一次投递",存在重复消费风险。

幂等处理策略:

手段 说明
消息 ID 唯一 每条消息生成 messageId,作为幂等主键
数据库约束 messageId 唯一索引,防止重复插入
Redis 去重 可用 SET 结构临时判断是否消费过

实现代码:

1
2
3
4
5
6
7
8
9
@Transactional
public void processMessage(NotifyMessage message) {
    if (notifyRecordRepository.findByMessageId(message.getMessageId()).isPresent()) {
        log.warn("重复消息,跳过处理: {}", message.getMessageId());
        return;
    }
    // 正常业务处理
    notifyRecordRepository.save(new NotifyRecord(...));
}

4. 消息失败重试机制 + 死信队列

Kafka 消费失败时,默认行为是跳过或重复消费,可借助 Spring Cloud Stream 提供的重试机制与 DLT(Dead Letter Topic)进行改进。

配置:application.yaml

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
spring:
  cloud:
    stream:
      bindings:
        notify-in:
          destination: notify-topic
          group: notify-group
          consumer:
            max-attempts: 3
            back-off-initial-interval: 1000

当超过最大尝试次数后,消息将被发送到 notify-topic.notify-group.dlq 死信队列中。

死信消费监听器

1
2
3
4
@Bean
public Consumer<NotifyMessage> dlqListener() {
    return msg -> log.error("死信队列消息:{}", msg);
}

5. 延迟队列机制实现

Kafka 原生不支持延迟投递,可采用如下方式:

方式一:数据库 + 定时轮询(推荐)

  1. 消息发送到延迟消息表,设置执行时间
  2. Spring 定时任务每隔 N 秒查询
  3. 满足延迟时间的消息转投 Kafka 主题执行
1
2
3
4
5
6
7
@Scheduled(fixedDelay = 5000)
public void scanDelayedMessages() {
    List<NotifyMessage> messages = delayMessageRepository.findReadyToSend(LocalDateTime.now());
    for (NotifyMessage msg : messages) {
        streamBridge.send("notify-out", msg);
    }
}

方式二:构造延迟 Topic + Scheduler 控制消费

  • 利用 Topic 分区/时间戳模拟消费延迟
  • 或引入外部中间件如 Kafka-Delay-Queue(复杂)

6. NotifyService 项目实战

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
notify-service
├── controller/NotifyController.java      // 消息生产者接口
├── stream/NotifySource.java              // 生产者通道
├── stream/NotifyConsumer.java            // 消息消费者函数
├── stream/NotifyConsumer.java            // 消费者监听
├── dto/NotifyMessage.java                // 消息结构
├── entity/NotifyRecord.java              // 数据落地 + 幂等性
├── repository/NotifyRecordRepository.java // 消息记录存储
├── scheduler/DelayMessageScheduler.java // 延迟调度实现
├── resources/application.yaml
└── pom.xml

pom.xml 添加依赖

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
<dependencies>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-jpa</artifactId>
    </dependency>
    <dependency>
        <groupId>com.mysql</groupId>
        <artifactId>mysql-connector-j</artifactId>
        <scope>runtime</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
</dependencies>

application.yaml 配置 Kafka binder

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
server:
  port: 8087
spring:
  application:
    name: notify-service
  cloud:
    stream:
      bindings:
        notify-out-0:
          destination: notify-topic
        notify-in-0:
          destination: notify-topic
          group: notify-group
      kafka:
        binder:
          brokers: localhost:9092

NotifySource.java 通道结构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
package com.yutao.stream;

import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.stereotype.Component;
import lombok.RequiredArgsConstructor;

@Component
@RequiredArgsConstructor
public class NotifySource {
    private final StreamBridge streamBridge;

    public void send(NotifyMessage message) {
        streamBridge.send("notify-out-0", message);
    }
}

使用 StreamBridge 灵活发送消息,替代旧的 @Output 模式。

NotifyMessage.java 消息结构 DTO

1
2
3
4
5
6
7
8
@Data
@AllArgsConstructor
@NoArgsConstructor
public class NotifyMessage {
    private String messageId; // 幂等唯一 ID(例如 UUID 或业务组合)
    private Long userId;
    private String content;
}

NotifyRecord.java 消息记录

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Entity
@Table(name = "notify_records", indexes = {
    @Index(name = "idx_message_id", columnList = "messageId", unique = true)
})
@Data
@NoArgsConstructor
@AllArgsConstructor
public class NotifyRecord {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    private String messageId; // 幂等校验字段

    private Long userId;

    @Column(length = 1024)
    private String content;

    private String status; // e.g., "RECEIVED", "PROCESSED", "FAILED"

    private LocalDateTime createdAt;

    private LocalDateTime processedAt;
}

NotifyRecordRepository.java 消息保存

1
2
3
public interface NotifyRecordRepository extends JpaRepository<NotifyRecord, Long> {
    Optional<NotifyRecord> findByMessageId(String messageId);
}

模拟生产者发送

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
@RestController
@RequestMapping("/notify")
@RequiredArgsConstructor
public class NotifyController {
    private final NotifySource notifySource; // 之前定义的发送封装

    @PostMapping("/send")
    public String send(@RequestBody Map<String, Object> payload) {
        String messageId = UUID.randomUUID().toString();
        Long userId = Long.valueOf(payload.get("userId").toString());
        String content = payload.get("content").toString();

        NotifyMessage msg = new NotifyMessage(messageId, userId, content);
        notifySource.send(msg);
        return "sent";
    }

}

NotifyConsumer.java 消息消费

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@Component
@RequiredArgsConstructor
public class NotifyConsumer {

    private static final Logger log = LoggerFactory.getLogger(NotifyConsumer.class);

    private final NotifyRecordRepository notifyRecordRepository;

    @Bean
    public Consumer<NotifyMessage> notifyIn() {
        return message -> {
            log.info("收到通知消息: {}", message);
            // 幂等检查 + 处理
            try {
                processMessage(message);
            } catch (Exception e) {
                log.error("处理通知消息失败, messageId={}, error={}", message.getMessageId(), e.getMessage(), e);
                // 失败后可考虑写入失败表/重试机制
            }
        };
    }

    @Transactional
    public void processMessage(NotifyMessage message) {
        // 幂等:已有记录则跳过
        String messageId = message.getMessageId();
        if (messageId == null || messageId.isBlank()) {
            throw new IllegalArgumentException("messageId 不能为空");
        }

        boolean exists = notifyRecordRepository.findByMessageId(messageId).isPresent();
        if (exists) {
            log.warn("重复消息,已处理过,messageId={}", messageId);
            return;
        }

        // 保存为已接收
        NotifyRecord record = new NotifyRecord();
        record.setMessageId(messageId);
        record.setUserId(message.getUserId());
        record.setContent(message.getContent());
        record.setStatus("PROCESSED");
        record.setCreatedAt(LocalDateTime.now());
        record.setProcessedAt(LocalDateTime.now());

        notifyRecordRepository.save(record);

        // 这里可以扩展:例如发送邮件/推送等后续逻辑
        log.info("通知持久化成功, messageId={}, userId={}", messageId, message.getUserId());
    }
}

DelayMessageScheduler.java 延迟调度实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
@Component
public class DelayMessageScheduler {
    @Autowired private NotifyRecordRepository repository;
    @Autowired private StreamBridge streamBridge;

    @Scheduled(fixedDelay = 5000)
    public void scan() {
        List<NotifyRecord> messages = repository.findReadyToSend(LocalDateTime.now());
        for (NotifyRecord record : messages) {
            NotifyMessage msg = new NotifyMessage(record.getMessageId(), record.getContent(), LocalDateTime.now());
            streamBridge.send("notify-out", msg);
        }
    }
}

总结

  • Spring Cloud Stream 是微服务事件驱动的理想选择
  • 使用 Kafka 实现高性能异步通信
  • 结合 StreamBridge、函数式编程,让生产与消费更灵活
  • 实际项目中,务必加上消息幂等、日志跟踪与异常处理机制