王磊的个人技术记录 王磊的个人技术记录

记录精彩的程序人生

目录
MQ组件可选改造方案
/  

MQ组件可选改造方案

1 摘要

MQ组件可选改造方案是通过集成spring.cloud.stream框架实现,本文主要讲解了spring.cloud.stream基本情况,RabbitMQ如何迁移至spring.cloud.stream,以及对spring.cloud.stream框架扩展的实现原理。

本文档适应于以下场景:

  • 业务代码和特定MQ组件解耦
  • 通过配置切换不同MQ组件
  • 不想额外部署MQ中间件

下文 二、三、四章节主要介绍spring.cloud.stream框架以及如何从RabbitMQ切换到spring.cloud.stream框架;五、六章节是基于spring.cloud.stream做的扩展,以及如何使用这些扩展

2 背景

目前部分产品或者模块使用到了MQ,并且直接依赖了RabbitMQ;但是现在云部署项目越来越多,多样环境下安装RabbitMQ有的时候会变成一个紧急阻碍性的问题,另外下沉地方市场也是我们产品的一个趋势,当小项目不需要集群时,我们希望降低实施部署要求的同时可以不动业务代码,让项目依然可以使用我们的产品或者模块。

使用spring.cloud.stream解耦的优点:

  • 切换MQ中间件时不再需要修改代码。仅需修改配置文件
  • 业务代码可以不用关心具体使用的中间件
  • 能够支持不同的队列使用不同的MQ

3 spring.cloud.stream

3.1 简介

Spring Cloud Stream是一个用来为微服务应用构建消息驱动能力的轻量级框架 。应用通过Spring Cloud Stream插入的input(相当于 消费者consumer,它是从队列中接收消息的)和output(相当于生产者producer,它是往队列中发送消息的。)通道与外界交流。通道通过指定中间件的Binder实现与外部代理连接。业务开发者不再关注具体消息中间件,只需关注Binder对应用程序提供的抽象概念来使用消息中间件实现业务即可

3.2 主要概念

image.png

从组件结构上,Spring Cloud Stream中有以下几个重要概念:

  • Destination Binders: 目标绑定器,目标是指kafka还是RabbitMQ,绑定器就是封装了目标中间件的包。
  • Destination Bindings:外部消息传递系统和应用程序之间的桥梁,提供消息的“生产者”和“消费者”(由目标绑定器创建)
  • Message:一种规范化的数据结构,生产者和消费者基于这个数据结构通过外部消息系统与目标绑定器和其他应用程序通信。
  • Input/Output : Spring Cloud Stream 内置了两种接口,分别定义了 binding 为 “input” 的输入流,和 “output” 的输出流

3.3 支持MQ列表

image.png

3.4 简单例子

  • 消息对象
public class TestMessage implements Serializable {
 private String body;

 public String getBody() {
     return body;
 }

 public void setBody(String body) {
     this.body = body;
 }
}
复制Error,复制失败!复制成功!
  • 生产者
 @Autowired
 private StreamBridge streamBridge; 

 @GetMapping("/stream/test")
 public void test() {
     TestMessage testMessage = new TestMessage();
     testMessage.setBody("发送的消息");
     // 生产消息
     // 第一个参数是绑定名称,格式为:自定义的绑定名称-out-0,myBroadcast是自定义的绑定名称,out代表生产者,0是固定写法
     // 自定义的绑定名称必须与消费方法的方法名保持一致
     // 第二个参数是发送的消息实体
     streamBridge.send("myBroadcast-out-0", testMessage);
     System.out.println("发送test消息完成");
 }复制Error,复制失败!复制成功!
  • 消费者
// 名称必须与生产消息时自定义的绑定名称一致
@Component("myBroadcast")
public class Concumer implements Consumer<Message<TestMessage{

 @Override
 public void accept(Message<TestMessage message) {
     TestMessage payload = message.getPayload();
     System.out.println("接收到消息:"+payload.getBody());
 }
}
复制Error,复制失败!复制成功!
  • 配置文件
spring:
cloud:
 function:
   # 定义消费者,多个用分号分隔 (系统中超过一个队列时必须定义否则会找不到)
   definition: myBroadcast
 stream:
   bindings:
     #   如何绑定通道,这里有个约定,开头是函数名,in表示消费消息,out表示生产消息,最后的数字是函数接受的参数的位置,destination后面为订阅的主题
     myBroadcast-in-0:
       destination: exchange   # 对应的真实的 RabbitMQ Exchange
       group: group # (如果不加为广播模式)对应队列,队列名为destination的值拼接上group值例:exchange.group
       binder: local_rabbit
     myBroadcast-out-0:
       destination: exchange   # 对应的真实的 RabbitMQ Exchange
       binder: local_rabbit
   binders:
     local_rabbit:
       type: rabbit
   defaultBinder:
     local_rabbit复制Error,复制失败!复制成功!
  • rabbitMq截图

image.png

image.png

3.5 消息分组

消息分组有2个好处,分别是集群 合理消费数据持久化

广播模式下一条消息会被集群内的所有节点监听到,并且执行对应的业务逻辑。很多情况我们希望一条消息只会被执行一次,在stream中是通过消息分组的概念实现的。

如果没有添加分组的时候,是处于广播模式下,此时如果监听的节点也就是消费者,在消息发出之后启动,消费节点是监听不到消息的,因为数据没有做持久化。如果添加了分组,即使消费节点在消息发出后启动也能收到消息。

  • 实现方式

image.png

3.6 消息分区

在Spring Cloud Stream中很容易使单个应用程序连接到消息中间件,但是更多的情况是多实例的应用程序,在实际的应用场景中,我们需要将同一种类型的消息,比如同一个用户,或者同一个类型的日志消息始终由同一个消费者消费并做统计,但是消息被分散到了不同的实例上去了, 就不好办了 。这个时候就可以使用消息分区了

  • Instance Index 和 Instance Count

当我们在采用集群的方式部署同一个应用时,每一个实例都可以接受到同一个应用有多少个实例数量,以及当前自己的实例在集群中的索引。Stream通过 spring.cloud.stream.instanceCount 实例数量和 spring.cloud.stream.instanceIndex 当前的实例索引实现这一点。如果实例总数instanceCount 是3,那么instanceIndex 索引从0开始到1、2 ,这两个属性的正确配置对于解决分区行为非常的重要,可以用来确保消息在多个实例之间正确的分割。

  • 生产者配置

spring.cloud.stream.bindings.demo_output.producer.partitionKeyExpression: 通过该参数指定了分区键的表达式规则,我们可以根据实际的输出消息规则来配置SpEL来生成合适的分区键

payload表示获取消息后,进行hash取值计算出分区的值

注: Spring表达式语言全称为“Spring Expression Language”,缩写为“SpEL”,类似于Struts2x中使用的OGNL表达式语言,能在运行时构建复杂表达式、存取对象图属性、对象方法调用等等,并且能与Spring功能完美整合,如能用来配置Bean定义

spring:
cloud:
stream:
 bindings:
    input:
       producer:
          partitionKeyExpression: payload
          partitionCount: 2
       destination: topic
       binder: local_rabbit

 binders:
    local_rabbit:
        type: rabbit
复制Error,复制失败!复制成功!
  • 消费者配置

表示开启分区

spring.cloud.stream.bindings.inputProduct.consumer.partitioned=true

指定了当前消费者的总实例数量,当前的实例是2

spring.cloud.stream.instanceCount=2

设置当前实例的索引号,从 0 开始

spring.cloud.stream.instanceIndex=0

不同的实例instanceIndex根据情况配置

spring:
cloud:
stream:
 bindings:
    input:
      consumer:
          partitioned: true
       destination: topic
       binder: rabbit1
       group: group1
 binders:
    rabbit1:
        type: rabbit
instance-count: 2
instance-index: 0复制Error,复制失败!复制成功!

4 常用mq功能在stream下的对应配置

  • 广播

不设置分组默认为广播模式,所有节点都能消费信息

  • 队列

设置分组,保证一条消息只会被集群中的一个节点消费

  • 设置并发

消费者数量的配置为:spring.cloud.stream.bindings.<channelName.consumer

例如:spring.cloud.stream.bindings.demo.consumer.concurrency=5

spring:
cloud:
 stream:
   bindings:
     myBroadcast-in-0:
       destination: my-broadcast-topic
       group: reverse-ordergroup #队列,如果不加会是广播模式
       consumer.concurrency: 5
     myBroadcast-out-0:
       destination: my-broadcast-topic复制Error,复制失败!复制成功!
  • 手动ack

修改消费消息的方法:

消息是带有 Header 的,类似 Http 的 headler,我们可以通过 Header 来获取channel。

// 名称必须与生产消息时自定义的绑定名称一致
@Component("myBroadcast")
public class Concumer implements Consumer<Message<TestMessage{

 @Override
 public void accept(Message<TestMessage message) {
     Channel channel = message.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class);
     Long deliveryTag = message.getHeaders().get(AmqpHeaders.DELIVERY_TAG, Long.class);
   TestMessage payload = message.getPayload();
     System.out.println("接收到消息:"+payload.getBody());
   try {
         channel.basicAck(deliveryTag,false);
   } catch (IOException e) {
         throw new RuntimeException(e);
     }
 }
}复制Error,复制失败!复制成功!

5 RabbitMQ注解迁移Stream示例

以BDE取数请求MQ为例

5.1 修改前

  • 生产者
@Autowired
 private RabbitTemplate rabbitTemplate;

 @Override
 public void requestDispatch(FetchTaskDTO fetchTaskDTO) {
     rabbitTemplate.convertAndSend(requestExchange, requestRoutingKey, JsonUtils.writeValueAsString(fetchTaskDTO));
 }
复制Error,复制失败!复制成功!
  • 消费者
 @RabbitListener(queues = "${fetch.request.queue}")
 public void doFetchRequest(Message message, Channel channel) {
     try {
         String msgStr = new String(message.getBody());
         FetchTaskDTO fetchTaskDTO = JsonUtils.readValue(msgStr, new TypeReference<FetchTaskDTO() {
         });
         fetchRequestHandle.doFetchRequest(fetchTaskDTO);
     } finally {
         try {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e) {
         log.error("ack异常:"+e.getMessage(), e);
        }
     }
 }复制Error,复制失败!复制成功!

5.2 修改后

  • 生产者
@Autowired
 private StreamBridge streamBridge;

 @Override
 public void requestDispatch(FetchTaskDTO fetchTaskDTO) {
     streamBridge.send("nrFetchRequest-out-0",        JsonUtils.writeValueAsString(fetchTaskDTO));
 }复制Error,复制失败!复制成功!
  • 消费者
@Component("nrFetchRequest")
public class NrFetchRequest implements Consumer<Message<String {

 @Autowired
 private NrFetchRequestHandle fetchRequestHandle;
 
 @Override
 public void accept(Message<String stringMessage) {
     try {
         Channel channel = message.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class);
     Long deliveryTag = message.getHeaders().get(AmqpHeaders.DELIVERY_TAG, Long.class);
         String msgStr = stringMessage.getPayload();
         FetchTaskDTO fetchTaskDTO = JsonUtils.readValue(msgStr, new TypeReference<FetchTaskDTO() {
         });
         fetchRequestHandle.doFetchRequest(fetchTaskDTO);
     } finally {
        try {
         channel.basicAck(deliveryTag,false);
          } catch (IOException e) {
         throw new RuntimeException(e);
        }
     }
 }
}
复制Error,复制失败!复制成功!
  • 配置文件
spring:
cloud:
 function:
   # 定义消费者bean,多个用分号分隔
   definition: nrFetchRequest
 stream:
   bindings:
     #   如何绑定通道,这里有个约定,开头是函数名,in表示消费消息,out表示生产消息,最后的数字是函数接受的参数的位置,destination后面为订阅的主题
     nrFetchRequest-in-0:
       destination: nr-fetch-request-topic   # 对应的真实的 RabbitMQ Exchange
       binder: rabbit
       group: bde # 所属分组
     nrFetchRequest-out-0:
       destination: nr-fetch-request-topic   # 对应的真实的 RabbitMQ Exchange
       binder: local_rabbit
   binders:
     rabbit:
       type: rabbit
   defaultBinder:
     rabbit复制Error,复制失败!复制成功!

5.3 迁移注意事项

5.3.1 多产品线或者多模块配置文件配置

改造为stream框架后,生产者和消费者的关系需要在配置文件中配置。主要需要配置的为spring.cloud.function.definition和spring.cloud.stream.bindings

  • spring.cloud.stream.bindings

实际开发中用到消息中间件的可能会有很多模块,如果所有的都在yml文件中配置会经常改动yml文件不利于开发。spring.cloud.stream.bindings值是用map收集的,是可以在多个模块中分别配置,stream框架最终是都能读取到的

例如:

a.在当前模块创建一个xxx.properties用于配置当前模块中用到的消息信息

image.png

b.在当前模块的配置类上添加注解@PropertySource(value = "classpath:bde-test.properties")将新加的配置文件引入到环境中

image.png

  • spring.cloud.function.definition

当队列大于一个时,需要把消费者bean名称配置到definition中间用;分割。如果有多个配置文件中都配置了这个值,会按照springboot的配置文件优先级,优先级高的覆盖掉低的。所以不利于多模块配置。这里提供两种处理方式思路

a.合并产品线

添加一个自定义接口,消费者bean都实现该接口,服务启动时收集到所有消费者bean,添加到系统参数中

@Component
@Order(Ordered.HIGHEST_PRECEDENCE)
public class EntStreamEnvironment implements InitializingBean {

 @Override
 public void afterPropertiesSet() {
     ApplicationContext applicationContext = SpringContextUtils.getApplicationContext();
     String[] beanNamesForType = applicationContext.getBeanNamesForType(EntConsumer.class);
     StringJoiner joiner = new StringJoiner(";");
     for (String beanName : beanNamesForType) {
         joiner.add(beanName);
     }
     Environment environment = applicationContext.getEnvironment();
     String name = environment.getProperty(FunctionProperties.FUNCTION_DEFINITION);
     name = StringUtils.isEmpty(name) ? joiner.toString() : name + ";" + joiner;
     ((StandardEnvironment) environment).getSystemProperties().put(FunctionProperties.FUNCTION_DEFINITION, name);
 }
}
复制Error,复制失败!复制成功!

b.nvwa胶水层

服务启动时获取对应参数,判断是否有自己的消费者,如果没有将自己的消费者信息添加到系统参数中

com.jiuqi.nvwa.starter.glue.env.GlueEnvironment

@Override
    public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application) {
        String defaultBinder = environment.getProperty("spring.cloud.stream.defaultBinder", "");
        if(ObjectUtils.isEmpty(defaultBinder)) {
            return;
        }
//        没有FunctionProperties.FUNCTION_DEFINITION配置时,直接set glueConsumer,
//        有配置时,判断值是否包含 glueConsumer,没有则追加上,用‘;’拼接
//        声明此Function作为可绑定功能bean
        String functionNames = environment.getProperty(FunctionProperties.FUNCTION_DEFINITION, "");
        if(ObjectUtils.isEmpty(functionNames)) {
            functionNames = "glueConsumer";
        }else if(!functionNames.contains("glueConsumer")) {
            functionNames+=";glueConsumer";
        }
      
        Map<String, Object map = new HashMap<(1);
        map.put("spring.cloud.stream.bindings.glueConsumer-in-0.group", "default");
     map.put(FunctionProperties.FUNCTION_DEFINITION, functionNames);
     PropertySource source = new MapPropertySource("glueEnv", map);
     environment.getPropertySources().addFirst(source);
    }复制Error,复制失败!复制成功!

5.3.2 新队列名跟旧队列名不一致

spring.cloud.stream的队列名是用spring.cloud.stream.bindings.xxx.destination 的值拼上点拼上spring.cloud.stream.bindings.xxx.group的值作为队列名称的。可能跟之前已有的队列名称不一致。

如果跟旧队列名称不一致,原有项目升级时需要保证旧队列的消息都消费完,避免因升级后队列名不一致导致有消息没能消费成功。

6 使用扩展方式

基于spring.cloud.stream扩展了ActiveMQ和数据库模式,引用对应依赖后,通过配置文件切换至ActiveMQ或者数据库模式,即可实现不引入单独部署的MQ组件也能正常使用对应业务。

6.1 引用依赖

implementation 'com.jiuqi.ent.mq-stream:common-starter-spring-cloud-stream:6.9.0'复制Error,复制失败!复制成功!

6.2 使用ActiveMQ模式

配置文件中添加以下配置

#                      ActiveMQ配置                        #
############################################################
# ActiveMQ服务地址
spring.activemq.broker-url=tcp://localhost:61616?jms.useAsyncSend=true
spring.activemq.user=admin
spring.activemq.password=activemq

#  Activemq监控界面配置hawtio配置开始
# 监控端口  如下配置可访问http://ip:9091/hawtio/console页面监控ActiveMQ消费情况
management.server.port=9091
management.server.base-path=/
management.endpoints.web.exposure.include=configprops,env,health,info,threaddump,logfile,hawtio,jolokia
management.endpoints.web.base-path=/
management.endpoints.web.path-mapping.hawtio=/hawtio/console
endpoints.jolokia.sensitive=false
hawtio.authenticationEnabled=false
hawtio.offline=true
#  ActiveMQ监控界面配置结束

# ActiveMQ相关配置,一般不需要改
spring.activemq.in-memory=true
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=100
spring.activemq.pool.idle-timeout=30000
spring.activemq.jms.pub-sub-domain=false

# spring cloud stream 切换到active中间件
spring.cloud.stream.default-binder=active
spring.cloud.stream.binders.active.type=active

############################################################复制Error,复制失败!复制成功!

6.3 使用数据库模式

配置文件中添加以下配置

############################################################
#                      数据库调度mq配置                        #
############################################################
#spring cloud stream 切换到使用数据库消息调度
spring.cloud.stream.default-binder=entdb
spring.cloud.stream.binders.entdb.type=entdb
############################################################复制Error,复制失败!复制成功!

7 对于spring.cloud.stream的扩展优化原理

7.1 优化点

  • 提供收集消费者bean对象自动往spring.cloud.function.definition添加功能
  • 启用手动ack时,提供工具类做ack
  • 提供stream集成ActiveMQ相关jar包
  • 基于stream框架实现一套基于数据库的消息调度框架

7.2 实现原理

7.2.1 手动ack工具类

对于常用的消息组件每个写一个ack的bean。bean根据配置文件中配置的消息中间件类型条件注入,保证只有一个ack实现被spring收集。封装的框架会提供一个EntManualAckUtil工具类,提供一个ack方法,方法里面获取到对应的bean调用对应组件的ack方法。

@Component("nrFetchRequest")
public class NrFetchRequest implements Consumer<Message<String {

@Autowired
private NrFetchRequestHandle fetchRequestHandle;

@Override
public void accept(Message<String stringMessage) {
   try {
       String msgStr = stringMessage.getPayload();
       FetchTaskDTO fetchTaskDTO = JsonUtils.readValue(msgStr, new TypeReference<FetchTaskDTO() {
       });
       fetchRequestHandle.doFetchRequest(fetchTaskDTO);
   } finally {
        EntManualAckUtil.ack(message);
   }
}
}复制Error,复制失败!复制成功!

7.2.2 spring.cloud.stream集成ActiveMQ

Apache ActiveMQ是Apache软件基金会所研发的开放源代码消息中间件;由于ActiveMQ是一个纯Java程序,因此只需要操作系统支持Java虚拟机,ActiveMQ便可执行 ,简单项目或者信创环境下可以退而求其次使用ActiveMQ来替代RabbitMQ。因为ActiveMQ是纯java程序,所以部署难度会大大降低。

7.2.2.1 内嵌启动ActiveMQ

ActiveMQ有两种部署方式,一种是单独部署,直接在官网下载对应安装包安装即可。另一种是可以嵌入到当前产品中随服务启动。后续会将内嵌启动封装成单独的jar包,如需使用,直接引入对应jar包即可,下面介绍嵌入部署的源码,其它产品线也可参照自行进行内嵌启动。

(1)引入依赖

api 'org.springframework.boot:spring-boot-starter-activemq'
// ActiveMQ 服务器内嵌启动时,需要添加 activemq-kahadb-store,用于数据持久化
api 'org.apache.activemq:activemq-kahadb-store'
// ActiveMQ 鉴权
api 'org.apache.activemq:activemq-jaas'
// ActiveMQ监控页面。 内嵌启动方式没有ActiveMQ-web监控页面。所以集成官方文档中推荐的另一个监控工具hawtio
api 'io.hawt:hawtio-springboot:2.5.0'复制Error,复制失败!复制成功!

(2)内嵌启动代码

@Component
public class ActiveMQStart implements ApplicationRunner {
private static final Logger logger = LoggerFactory.getLogger(ActiveMQStart.class);
@Override
public void run(ApplicationArguments args) throws Exception {
   try {
       logger.info("应用启动......");
       /**设置 ActiveMQ 消息服务器用于被客户端连接的 url 地址*/
       String serviceURL = "tcp://localhost:61616";
       /**BrokerService 表示 ActiveMQ 服务,每一个 BrokerService 表示一个消息服务器实例
            * 如果想启动多个,只需要 start 多个不同端口的 BrokerService 即可*/
       BrokerService brokerService = new BrokerService();
       brokerService.setBrokerName("GC-ActiveMQ");
       // 鉴权插件
       brokerService.setPlugins(new BrokerPlugin[]{new JaasDualAuthenticationPlugin()});
       //为指定地址添加新的传输连接器
       brokerService.addConnector(serviceURL);
       /**启动 ActiveMQ 服务,此时客户端便可以使用提供的地址进行连接,然后发送消息过来,或者从这里消费消息。
            * 注意:这里内嵌启动后,默认是没有提供 8161 端口的 web 管理界面的,照样能做消息中间件使用*/
       brokerService.start();
       logger.info("启动内嵌 ActiveMQ 服务器完成......");
   } catch (Exception e) {
       logger.error("启动内嵌 ActiveMQ 服务器失败...",e);
   }
}
}复制Error,复制失败!复制成功!

(3)添加配置文件

resource下增加login.config users.properties groups.properties文件用于配置鉴权信息。如启动时没有添加鉴权插件,可以不配置。

  • login.config
/*配置可参考类注解:org.apache.activemq.security.JaasDualAuthenticationBroker*/
activemq-domain {
 org.apache.activemq.jaas.PropertiesLoginModule required
     org.apache.activemq.jaas.properties.user="users.properties"
     org.apache.activemq.jaas.properties.group="groups.properties";
};复制Error,复制失败!复制成功!
  • groups.properties
## 用户分组信息
admins=admin复制Error,复制失败!复制成功!
  • users.properties
## 用户名=密码
admin=activemq复制Error,复制失败!复制成功!

(4)配置文件添加hawtio监控相关配置

##ActiveMQ界面配置开始
# ActiveMQ JMX
management:
server:
port: 9091
base-path: /
endpoints:
web:
 exposure:
   include: ["configprops", "env", "health", "info", "threaddump", "logfile", "hawtio", "jolokia"]
 base-path: /
 path-mapping:
   hawtio: /hawtio/console

# 禁用Jolokia端点的敏感性:
endpoints:
jolokia:
sensitive: false

# 不启用hawtio验证
hawtio:
authenticationEnabled: false
offline: true
##ActiveMQ界面配置结束
复制Error,复制失败!复制成功!

服务启动后访问 ip:端口号/hawtio/console访问监控界面

7.2.2.2 stream集成ActiveMQ

spring.cloud.stream默认是没有集成ActiveMQ中间件的,所以需要我们自己做ActiveMQ的集成。消息的生产者消费者需要集成该模块。后续会将集成代码封装成单独的jar包,如需使用,直接引入对应jar包即可,下面介绍集成ActiveMQ的源码,其它产品线也可参照自行实现。

(1)添加依赖

    api 'org.springframework.cloud:spring-cloud-stream'
api 'org.springframework.boot:spring-boot-starter-activemq'复制Error,复制失败!复制成功!

(2)添加配置文件

resources/META-INF文件夹下添加spring.binders文件。文件中指向Binder的配置类

文件内容示例:

# active对应yml配置文件中的spring.cloud.stream.binders.***.type的值
active:\
com.jiuqi.common.taskschedule.activemq.binder.EntTaskScheduleActiveMQBinderAutoConfiguration
复制Error,复制失败!复制成功!

(3)实现 Binder 接口 - 实现消息发送/消费

public class ActiveMQMessageChannelBinder implements Binder<MessageChannel, ConsumerProperties, ProducerProperties {
private static final Logger logger = LoggerFactory.getLogger(ActiveMQMessageChannelBinder.class);

@Autowired
private JmsTemplate jmsTemplate;

/**
    * 接受 ActiveMQ 消息
*
    * @param name
    * @param group
    * @param inputChannel
    * @param consumerProperties
    * @return
*/
@Override
public Binding<MessageChannel bindConsumer(String name, String group, MessageChannel inputChannel, ConsumerProperties consumerProperties) {
   SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
   container.setConnectionFactory(jmsTemplate.getConnectionFactory());
   container.setDestinationName(name);
   container.setSessionTransacted(true);
   container.setConcurrentConsumers(consumerProperties.getConcurrency());
   container.setMessageListener((MessageListener) message - {
       // message 来自于 ActiveMQ
       Object object = null;
       if (message instanceof ActiveMQBytesMessage) {
           ActiveMQBytesMessage objectMessage = (ActiveMQBytesMessage) message;
           try {
               byte[] bytes = new byte[(int) objectMessage.getBodyLength()];
               objectMessage.readBytes(bytes);
               object = new String(bytes);
           } catch (JMSException e) {
               e.printStackTrace();
           }
       }
       if (message instanceof ActiveMQTextMessage) {
           ActiveMQTextMessage objectMessage = (ActiveMQTextMessage) message;
           try {
               object = objectMessage.getText();
           } catch (JMSException e) {
               e.printStackTrace();
           }
       }
       if (message instanceof ObjectMessage) {
           ObjectMessage objectMessage = (ObjectMessage) message;
           try {
               object = objectMessage.getObject();
           } catch (JMSException e) {
               e.printStackTrace();
           }
       }
       inputChannel.send(new GenericMessage(object));
   });
   container.start();
   return () - {
   };
}

/**
    * 负责发送消息到 ActiveMQ
*
    * @param name
    * @param outputChannel
    * @param producerProperties
    * @return
*/
@Override
public Binding<MessageChannel bindProducer(String name, MessageChannel outputChannel, ProducerProperties producerProperties) {
   Assert.isInstanceOf(SubscribableChannel.class, outputChannel, "Binding is supported only for SubscribableChannel instances");
   SubscribableChannel subscribableChannel = (SubscribableChannel) outputChannel;
   DirectWithAttributesChannel aaa = (DirectWithAttributesChannel) outputChannel;
   aaa.addInterceptor(new ChannelInterceptor() {
       @Override
       public Message<? preSend(Message<? message, MessageChannel channel) {
           return ChannelInterceptor.super.preSend(message, channel);
       }
       @Override
       public void afterSendCompletion(Message<? message, MessageChannel channel, boolean sent, Exception ex) {
           ChannelInterceptor.super.afterSendCompletion(message, channel, sent, ex);
       }
   });
   subscribableChannel.subscribe(message - {
       jmsTemplate.convertAndSend(name, message.getPayload());
   });
   return () - {
       logger.info("Unbinding");
   };
}
}
复制Error,复制失败!复制成功!

(4)添加对应binder配置类暴露给steam框架上面添加的binder处理类

@Configuration
@ConditionalOnMissingBean(Binder.class)
public class EntTaskScheduleActiveMQBinderAutoConfiguration {
/**
    * 暴露自定义的 ActiveMQMessageChannelBinder Bean
*/
@Bean
public ActiveMQMessageChannelBinder activeMQMessageChannelBinder(@Nullable JmsTemplate jmsTemplate) {
   return new ActiveMQMessageChannelBinder(jmsTemplate);
}
}复制Error,复制失败!复制成功!

7.2.3 基于stream框架实现基于数据库的消息调度框架

7.2.3.1 数据库调度模式实现原理

(1)表结构

-- 任务调度 任务信息表
CREATE TABLE ENT_TASK_MESSAGE(
ID                 VARCHAR(36) NOT NULL,  --主键
VER                NUMBER(18),  -- 更新时间 时间戳  乐观锁
QUEUE_NAME         NVARCHAR(100) NOT NULL, --队列名
MESSAGE_BODY       NVARCHAR(1000) , -- 消息体
STATUS             NUMBER(1) NOT NULL, --状态
CREATE_TIME        TIMESTAMP(6), --创建时间
START_TIME        TIMESTAMP(6), --消息执行开始时间
END_TIME        TIMESTAMP(6), --消息执行结束时间
EXECUTOR_IP        VARCHAR(20),            -- 执行任务的主机ip
STORE_TYPE         NUMBER(1) -- 是否存在当前表
);
ALTER TABLE ENT_TASK_MESSAGE ADD CONSTRAINT PK_ENT_TASK_MESSAGE_ID PRIMARY KEY (ID);
CREATE INDEX IDX_ENT_TASK_QUEUE_STATUS ON ENT_TASK_MESSAGE (QUEUE_NAME,STATUS);
CREATE INDEX IDX_ENT_TASK_UPDATE ON ENT_TASK_MESSAGE (VER);
-- 任务调度 任务信息clob表
CREATE TABLE ENT_TASK_MESSAGE_CLOB(
ID                 VARCHAR(36) NOT NULL, --任务信息表的id
MESSAGE_BODY       CLOB --大字段
);
ALTER TABLE ENT_TASK_MESSAGE_CLOB ADD CONSTRAINT PK_ENT_TASK_MESSAGE_CLOB_ID PRIMARY KEY (ID);

-- 任务调度 任务失败结果表
CREATE TABLE ENT_TASK_MESSAGE_ERRORRESULT(
ID                 VARCHAR(36) NOT NULL, --任务信息表的id
ERRORRESULT        CLOB --大字段
);
ALTER TABLE ENT_TASK_MESSAGE_ERRORRESULT ADD CONSTRAINT ENT_TASK_MESSAGE_ERRORRESULT_ID PRIMARY KEY (ID);

-- 任务调度 任务信息历史数据表
CREATE TABLE ENT_TASK_MESSAGE_HISTORY(
ID                 VARCHAR(36) NOT NULL,  --主键
VER                NUMBER(18),  -- 更新时间 时间戳  乐观锁
QUEUE_NAME         NVARCHAR(100) NOT NULL, --队列名
MESSAGE_BODY       NVARCHAR(1000) , -- 消息体
STATUS             NUMBER(1) NOT NULL, --状态
CREATE_TIME        TIMESTAMP(6), --创建时间
START_TIME        TIMESTAMP(6), --消息执行开始时间
END_TIME        TIMESTAMP(6), --消息执行结束时间
EXECUTOR_IP        VARCHAR(20),            -- 执行任务的主机ip
STORE_TYPE         NUMBER(1) -- 是否存在当前表
);
ALTER TABLE ENT_TASK_MESSAGE_HISTORY ADD CONSTRAINT PK_ENT_TASK_MESSAGE_HISTORY_ID PRIMARY KEY (ID);
CREATE INDEX IDX_ENT_TASK_MESSAGE_H_UPDATE ON ENT_TASK_MESSAGE_HISTORY (UPDATE_TIME);
复制Error,复制失败!复制成功!

(2)调度逻辑

  • 调度线程

服务启动时,首先更新时间超过五分钟的执行中的消息重置为待执行(保证服务异常宕机导致正在执行的消息能够被重新执行到)然后启动一个线程循环执行。

a.更新当前节点未完成消息的更新时间(配合服务启动的五分钟检查,保证只要服务是正常启动的执行中的消息每次循环的时候更新时间都会更新)

b.根据当前节点队列名消费每个队列最早插入数据库的一条消息。(使用乐观锁保证只有一个节点消费)

c.将获取到的消息,放到对应的队列线程池中执行

d.sleep(500)

  • 发送消息

接入stream框架,发送消息时往数据库中插入一条记录

  • 绑定消费者

接入stream框架,绑定消费者的接口中,做对应的队列线程池初始化

  • 历史数据清理

启动定时任务,超过7天的数据搬移到历史数据表。超过15天的数据从历史数据表中清除

7.2.3.2 spring.cloud.stream集成DB模式

(1)添加依赖

    api 'org.springframework.cloud:spring-cloud-stream'
api 'com.jiuqi.ent.mq-stream:common-db-spring-cloud-stream'复制Error,复制失败!复制成功!

(2)添加配置文件

resources/META-INF文件夹下添加spring.binders文件。文件中指向Binder的配置类

文件内容示例:

# entdb对应yml配置文件中的spring.cloud.stream.binders.***.type的值
entdb:\
com.jiuqi.common.taskschedule.streamdb.binder.EntTaskScheduleDbBinderAutoConfiguration
复制Error,复制失败!复制成功!

(3)实现 Binder 接口 - 实现消息发送/消费

public class DbMessageChannelBinder implements Binder<MessageChannel, ConsumerProperties, ProducerProperties {
private static final Logger logger = LoggerFactory.getLogger(DbMessageChannelBinder.class);

public DbMessageChannelBinder(EntDbTaskSendTemplate dbTaskSendTemplate) {
   this.dbTaskSendTemplate = dbTaskSendTemplate;
}

private EntDbTaskSendTemplate dbTaskSendTemplate;

/**
    * 接受 MQ 消息
*
    * @param name
    * @param group
    * @param inputChannel
    * @param consumerProperties
    * @return
*/
@Override
public Binding<MessageChannel bindConsumer(String name, String group, MessageChannel inputChannel, ConsumerProperties consumerProperties) {
   SpringContextUtils.getBean(EntTaskDbInitPoolHandle.class).addQueueToPool(name, consumerProperties.getConcurrency());
   EntTaskExecuteCollect.addListenerNameToFunctionMap(name, (message) -
           inputChannel.send(new GenericMessage(message))
   );
   return () - {
   };
}

/**
    * 负责发送消息到 MQ
*
    * @param name
    * @param outputChannel
    * @param producerProperties
    * @return
*/
@Override
public Binding<MessageChannel bindProducer(String name, MessageChannel outputChannel, ProducerProperties producerProperties) {
   Assert.isInstanceOf(SubscribableChannel.class, outputChannel, "Binding is supported only for SubscribableChannel instances");
   SubscribableChannel subscribableChannel = (SubscribableChannel) outputChannel;
   DirectWithAttributesChannel aaa = (DirectWithAttributesChannel) outputChannel;
   aaa.addInterceptor(new ChannelInterceptor() {
       @Override
       public Message<? preSend(Message<? message, MessageChannel channel) {
           return ChannelInterceptor.super.preSend(message, channel);
       }
       @Override
       public void afterSendCompletion(Message<? message, MessageChannel channel, boolean sent, Exception ex) {
           ChannelInterceptor.super.afterSendCompletion(message, channel, sent, ex);
       }
   });
   subscribableChannel.subscribe(message - {
       long a = System.currentTimeMillis();
       Object payload = message.getPayload();
       if (payload instanceof byte[]) {
           payload = new String((byte[]) payload);
       }
       dbTaskSendTemplate.convertAndSend("", "", name, payload);
       System.out.println("---DB------"+(System.currentTimeMillis()-a));
   });
   return () - {
       logger.info("Unbinding");
   };
}
}复制Error,复制失败!复制成功!

(4) 添加对应binder配置类暴露给steam框架上面添加的binder处理类

@Configuration
@ConditionalOnMissingBean(Binder.class)
public class EntTaskScheduleDbBinderAutoConfiguration {
@Autowired
private EntDbTaskSendTemplate dbTaskSendTemplate;

/**
    * 暴露自定义的 ActiveMQMessageChannelBinder Bean
*/
@Bean
public DbMessageChannelBinder activeMQMessageChannelBinder() {
   return new DbMessageChannelBinder(dbTaskSendTemplate);
}
}复制Error,复制失败!复制成功!

7.3 不同消息中间件对比

RabbitMQ的各方面性能均为最优选择,如果能使用RabbitMQ的项目建议优先选择RabbitMQ,用不了RabbitMQ的在根据实际需要自行选择。

RabbitMQActiveMQDB
开发语言erlangjavajava
单机吞吐量万级官方文档说是万级,jmeter测试每秒六七百jmeter测试每秒五百多
队列
广播可开发支持
消息确认
消息持久化
监控界面
资料完善较为完善,但是资料有些都比较旧。刚开始 较少
成熟度成熟创建已久,较为成熟刚开始
定制化需求开发难度社区活跃,基本需要的场景应该都支持不一定能集成,需要查看源码容易定制化开发
版本维护方式Pivotal公司(Spring,Tomcat,Redis都是他家的)apache自己维护
部署方式单独部署可嵌入产品,可单独部署目前设计不能单独部署,只能集成在产品中
问题排查方式社区活跃基本问题应该都能搜到社区不活跃,根据现象查资料,可能会比较耗时有源码能调试分析
认证支持可以配置用户密码等鉴权信息直接入库,无需校验
使用场景定位大型项目或者非信创环境首选信创单节点;集群下可以使用,但是需要单独部署或者指定一个服务作为mq节点,存在服务不可用风险。云环境下因为服务地址有可能不固定,使用有风险单节点,集群,云环境都可以使用,只要有数据库就能用

8 参考资料

8.1 spring-cloud-stream官网:

https://spring.io/projects/spring-cloud-stream

基本介绍:https://docs.spring.io/spring-cloud-stream/docs/3.2.5-SNAPSHOT/reference/html/spring-cloud-stream.html#spring-cloud-stream-reference

8.2 其它资料:

8.2.1 概念介绍

8.2.2 集成ActiveMQ


标题:MQ组件可选改造方案
作者:wangduidui
地址:https://www.wangleijava.com/articles/2025/07/16/1752674481270.html