econets-vue开发指南 econets-vue开发指南
首页
  • 萌新必读
  • 后端手册
  • 中间件手册
  • 工作流手册
  • 工作流手册
  • 大屏手册
  • 支付手册
  • 会员手册
  • 商城手册
  • 公众号手册
  • CRM手册
  • 运维手册
GitHub (opens new window)
首页
  • 萌新必读
  • 后端手册
  • 中间件手册
  • 工作流手册
  • 工作流手册
  • 大屏手册
  • 支付手册
  • 会员手册
  • 商城手册
  • 公众号手册
  • CRM手册
  • 运维手册
GitHub (opens new window)
  • 萌新必读

    • 简介
    • 功能列表
    • 快速启动(后端项目)
    • 快速启动(前端项目)
    • 技术选型
    • 项目结构
    • 代码热加载
    • 一键改包
    • 删除功能
    • 内网穿透
  • 后端手册

    • 新建服务
    • 代码生成【单表】(新增功能)
    • 代码生成【主子表】
    • 代码生成(树表)
    • 功能权限
    • 数据权限
    • 用户体系
    • 三方登录
    • OAuth 2.0(SSO 单点登录)
    • SaaS 多租户【字段隔离】
    • SaaS 多租户【数据库隔离】
    • WebSocket 实时通信
    • 13异常处理(错误码)
    • 参数校验
    • 分页实现
    • 文件存储(上传下载)
    • Excel 导入导出
    • 系统日志
    • MyBatis 数据库
    • MyBatis 联表&分页查询
    • 多数据源(读写分离)
    • Redis 缓存
    • 本地缓存
    • 异步任务
    • 配置管理
    • 工具类 Util
    • 单元测试
    • 分布式锁
    • 幂等性(防重复提交)
    • 数据库文档
    • 验证码
  • 中间件手册

    • 定时任务
    • 消息队列(内存)
    • 消息队列(Redis)
      • 1. 集群消费
        • 1.1 使用场景
        • 1.2 实现源码
        • 1.3 实战案例
        • 1.3.0 引入依赖
        • 1.3.1 Message 消息
        • 1.3.2 SmsProducer 生产者
        • 1.3.3 SmsSendConsumer 消费者
        • 1.3.4 简单测试
      • 2. 广播消费
        • 2.1 使用场景
        • 2.2 实现源码
        • 2.3 实战案例
    • 消息队列(RocketMQ)
    • 消息队列(RabbitMQ)
    • 消息队列(Kafka)
    • 限流熔断
  • 工作流手册

    • 工作流(Flowable)会签、或签
  • 指南
  • 中间件手册
EcoNets Tech
2024-01-23
目录

消息队列(Redis)

blossom-spring-boot-starter-mq (opens new window)技术组件,基于 Redis 实现分布式消息队列:

  • 使用 Stream (opens new window)特性,提供【集群】消费的能力。
  • 使用 Pub/Sub (opens new window)特性,提供【广播】消费的能力。

疑问:什么是【广播】消费?什么是【集群】消费?

参见《阿里云 —— 集群消费和广播消费 》 (opens new window)文档

# 1. 集群消费

集群消费,是指消息发送到 Redis 时,有且只会被一个消费者(应用 JVM 实例)收到,然后消费成功。如下图所示:

doc_econets_pic_175.png

友情提示:

如果你需要使用到【集群】消费,必须使用 Redis 5.0.0 以上版本,因为 Stream 特性是在该版本之后才引入噢!

# 1.1 使用场景

集群消费在项目中的使用场景,主要是提供可靠的、可堆积的异步任务的能力。例如说:

  • 短信模块,使用它异步 (opens new window)发送短信。
  • 邮件模块,使用它异步 (opens new window)发送邮件。

相比 《开发指南 —— 异步任务》 (opens new window) 来说,Spring Async 在 JVM 实例重启时,会导致未执行完的任务丢失。而集群消费,因为消息是存储在 Redis 中,所以不会存在该问题。

# 1.2 实现源码

集群消费基于 Redis Stream 实现:

  • 实现 AbstractRedisStreamMessage 抽象类,定义【集群】消息。
  • 使用 RedisMQTemplate 的 #send(message) 方法,发送消息。
  • 实现 AbstractRedisStreamMessageListener 接口,消费消息。

最终使用 RedisMQAutoConfiguration 配置类,扫描所有的 AbstractRedisStreamMessageListener 监听器,初始化对应的消费者。如下图所示:

doc_econets_pic_176.png

# 1.3 实战案例

以【短信发送】举例子,改造使用 Redis 作为消息队列,同时也是讲解集群消费的使用。如下图所示:

doc_econets_pic_177.png

# 1.3.0 引入依赖

在 blossom-module-system-biz 模块中,引入 blossom-spring-boot-starter-mq 技术组件。如下所示:

<dependency>
    <groupId>cn.econets.boot</groupId>
    <artifactId>blossom-spring-boot-starter-mq</artifactId>
</dependency>

# 1.3.1 Message 消息

在 message 包下,修改 SmsSendMessage 类,短信发送消息。代码如下:

@Data
public class SmsSendMessage extends AbstractRedisStreamMessage { // 重点:需要继承 AbstractRedisStreamMessage 类

    /**
     * 短信日志编号
     */
    @NotNull(message = "短信日志编号不能为空")
    private Long logId;
    /**
     * 手机号
     */
    @NotNull(message = "手机号不能为空")
    private String mobile;
    /**
     * 短信渠道编号
     */
    @NotNull(message = "短信渠道编号不能为空")
    private Long channelId;
    /**
     * 短信 API 的模板编号
     */
    @NotNull(message = "短信 API 的模板编号不能为空")
    private String apiTemplateId;
    /**
     * 短信模板参数
     */
    private List<KeyValue<String, Object>> templateParams;

}

# 1.3.2 SmsProducer 生产者

在 producer 包下,修改 SmsProducer 类,Sms 短信相关消息的生产者。代码如下:

@Slf4j
@Component
public class SmsProducer {

    @Resource
    private RedisMQTemplate redisMQTemplate; // 重点:注入 RedisMQTemplate 对象

    /**
     * 发送 {@link SmsSendMessage} 消息
     *
     * @param logId 短信日志编号
     * @param mobile 手机号
     * @param channelId 渠道编号
     * @param apiTemplateId 短信模板编号
     * @param templateParams 短信模板参数
     */
    public void sendSmsSendMessage(Long logId, String mobile,
                                   Long channelId, String apiTemplateId, List<KeyValue<String, Object>> templateParams) {
        SmsSendMessage message = new SmsSendMessage().setLogId(logId).setMobile(mobile);
        message.setChannelId(channelId).setApiTemplateId(apiTemplateId).setTemplateParams(templateParams);
        redisMQTemplate.send(message); // 重点:使用 RedisMQTemplate 发送消息
    }

}

# 1.3.3 SmsSendConsumer 消费者

在 consumer 包下,修改 SmsSendConsumer 类,SmsSendMessage 的消费者。代码如下:

@Component
@Slf4j
public class SmsSendConsumer extends AbstractRedisStreamMessageListener<SmsSendMessage> { // 重点:继承 AbstractRedisStreamMessageListener 类,并填写对应的 Message 类

    @Resource
    private SmsSendService smsSendService;

    @Override // 重点:实现 onMessage 方法
    public void onMessage(SmsSendMessage message) {
        log.info("[onMessage][消息内容({})]", message);
        smsSendService.doSendSms(message);
    }

}

# 1.3.4 简单测试

① Debug 启动后端项目,可以在 SmsProducer 和 SmsSendConsumer 上面打上断点,稍微调试下。

② 打开 SmsTemplateController.http 文件,使用 IDEA httpclient 发起请求,发送短信。如下图所示:

doc_econets_pic_178.png

如果 IDEA 控制台看到 [onMessage][消息内容 日志内容,说明消息的发送和消费成功。

# 2. 广播消费

广播消费,是指消息发送到 Redis 时,所有消费者(应用 JVM 实例)收到,然后消费成功。如下图所示:

doc_econets_pic_179.png

# 2.1 使用场景

例如说,在应用中,缓存了数据字典等配置表在内存中,可以通过 Redis 广播消费,实现每个应用节点都消费消息,刷新本地内存的缓存。

又例如说,我们基于 WebSocket 实现了 IM 聊天,在我们给用户主动发送消息时,因为我们不知道用户连接的是哪个提供 WebSocket 的应用,所以可以通过 Redis 广播消费。每个应用判断当前用户是否是和自己提供的 WebSocket 服务连接,如果是,则推送消息给用户。

# 2.2 实现源码

广播消费基于 Redis Pub/Sub 实现:

  • 实现 AbstractChannelMessage 抽象类,定义【广播】消息。
  • 使用 RedisMQTemplate 的 #send(message) 方法,发送消息。
  • 实现 AbstractRedisChannelMessageListener 接口,消费消息。

最终使用 RedisMQAutoConfiguration 配置类,扫描所有的 AbstractRedisChannelMessageListener 监听器,初始化对应的消费者。如下图所示:

doc_econets_pic_180.png

# 2.3 实战案例

参见 《开发指南 —— 本地缓存》 (opens new window)

上次更新: 2024/02/18, 14:43:37
消息队列(内存)
消息队列(RocketMQ)

← 消息队列(内存) 消息队列(RocketMQ)→

Theme by Vdoing | Copyright © 2019-2024 EcoNets Tech | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式