1、什么是分布式消息队列?
- 消息队列 是指利用 高效可靠 的 消息传递机制 进行与平台无关的 数据交流,并基于 数据通信 来进行分布式系统的集成。
 - 通过提供 消息传递 和 消息排队 模型,它可以在 分布式环境 下提供 应用解耦、弹性伸缩、冗余存储、流量削峰、异步通信、数据同步 等等功能,其作为 分布式系统架构 中的一个重要组件,有着举足轻重的地位。
 
2、消息队列--历程与区别
2.1、发布历程
下图根据时间线展示了不同时间点产生的消息队列产品,主要的产品有:
2.2、各个中间件的区别

3、中间件--详解
3.1、Apache Kafka
- 简介
Apache Kafka 是一个 分布式消息发布订阅 系统。它最初由 LinkedIn 公司基于独特的设计实现为一个 分布式的日志提交系统 (a distributed commit log),之后成为 Apache 项目的一部分。Kafka 性能高效、可扩展良好 并且 可持久化。它的 分区特性,可复制 和 可容错 都是其不错的特性。 - 架构图

 

基本术语
- Producer:消息生产者。
 - Topic:Topic是个抽象的虚拟概念,一个集群可以有多个Topic,作为一类消息的标识。
 - Partition:Partition是个物理概念,一个Topic对应一个或多个Partition。
 - Replicas:一个Partition有多个Replicas副本。
 - Consumer:消息读取者。消费者订阅主题,并按照一定顺序读取消息。
 - Offset:偏移量是一种元数据,是不断递增的整数。
 - Broker:独立的Kafka服务器。
 
主要特性
- 快速持久化:可以在 O(1) 的系统开销下进行 消息持久化;
 - 高吞吐:在一台普通的服务器上既可以达到 10W/s 的 吞吐速率;
 - 完全的分布式系统:Broker、Producer 和 Consumer 都原生自动支持 分布式,自动实现 负载均衡;
 - 支持 同步 和 异步 复制两种 高可用机制;
 - 支持 数据批量发送 和 拉取;
 - 零拷贝技术(zero-copy):减少 IO 操作步骤,提高 系统吞吐量;
 - 数据迁移、扩容 对用户透明;
 - 无需停机 即可扩展机器;
 - 其他特性:丰富的 消息拉取模型、高效 订阅者水平扩展、实时的 消息订阅、亿级的 消息堆积能力、定期删除机制;
 
优点
- 客户端语言丰富:支持 Java、.Net、PHP、Ruby、Python、Go 等多种语言;
 - 高性能:单机写入 TPS 约在 100 万条/秒,消息大小 10 个字节;
 - 提供 完全分布式架构,并有 replica 机制,拥有较高的 可用性 和 可靠性,理论上支持 消息无限堆积;
 - 支持批量操作;
 - 消费者 采用 Pull方式 获取消息。消息有序,通过控制 能够保证所有消息被消费且仅被消费一次;
 - 有优秀的第三方 Kafka Web 管理界面 Kafka-Manager;
 - 在 日志领域 比较成熟,被多家公司和多个开源项目使用。
 
缺点
- Kafka 单机超过64个 队列/分区 时,Load 时会发生明显的飙高现象。队列 越多,负载 越高,发送消息 响应时间变长;
 - 使用 短轮询方式,实时性 取决于 轮询间隔时间;
 - 消费失败 不支持重试;
 - 支持 消息顺序,但是 一台代理宕机 后,就会产生 消息乱序;
 - 社区更新较慢。
 
使用场景
- 日志收集:大量的日志消息先写入kafka,数据服务通过消费kafka消息将数据落地;
 - 消息系统:解耦生产者和消费者、缓存消息等;
 - 用户活动跟踪:kafka经常被用来记录web用户或者app用户的各种活动
 - 运营指标:记录运营、监控数据,包括收集各种分布式应用的数据,生产各种操作的集中反馈。
 - 流式处理:比如spark streaming
 
3.2、RabbitMQ
- 简介
RabbitMQ 是实现了 高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件(英语:Message-oriented middleware))。RabbitMQ服务器是用 Erlang 语言 编写的,而 群集和故障转移 是构建在开放电信平台框架上的。所有主要的编程语言均有与 代理接口通讯 的客户端函式库。 - 架构图

 基本术语
- Broker:接收客户端链接实体,实现AMQP消息队列和路由功能;
 - Virtual Host:是一个虚拟概念,权限控制的最小单位。
 - Exchange:接收消息生产者的消息并将消息转发到队列。
 - Message Queue:消息队列,存储为被消费的消息;
 - Message:由Header和Body组成,Header是生产者添加的各种属性;
 - Binding:Binding连接起了Exchange和Message Queue。
 - Connection:在Broker和客户端之间的TCP连接;
 - Channel:信道。Broker和客户端只有tcp连接是不能发送消息的,必须创建信道
 - Command:AMQP命令,客户端通过Command来完成和AMQP服务器的交互。
 
优点
- 基于AMQP协议:除了Qpid,RabbitMQ 是唯一一个 实现了AMQP标准的消息服务器;
 - 健壮、稳定、易用;
 - 社区活跃,文档完善;
 - 支持定时消息;
 - 可插入的身份验证,授权,支持TLS和LDAP;
 - 支持根据消息标识查询消息,也支持根据消息内容查询消息。
 
缺点
- erlang 开发源码难懂,不利于做二次开发和维护;
 - 接口和协议复杂,学习和维护成本较高。
 
3.3、Apache ActiveMQ
- 简介
ActiveMQ 是由 Apache 出品,ActiveMQ 是一个完全支持 JMS 1.1 和 J2EE 1.4 规范的 JMS Provider 实现。它非常快速,支持 多种语言的客户端 和 协议,而且可以非常容易的嵌入到企业的应用环境中,并有许多高级功能。 - 架构图

 主要特性
- 服从JMS规范:JMS 规范提供了良好的标准和保证,包括:同步 或 异步 的消息分发,一次和仅一次的消息分发,消息接收 和 订阅 等等。
 - 连接灵活性:ActiveMQ 提供了广泛的 连接协议,支持的协议有:HTTP/S,IP多播,SSL,TCP,UDP 等等。
 - 支持的协议种类多:OpenWire、STOMP、REST、XMPP、AMQP;
 - 持久化插件和安全插件:ActiveMQ 提供了 多种持久化 选择。
 - 支持的客户端语言种类多:除了 Java 之外,还有:C/C++,.NET,Perl,PHP,Python,Ruby;
 - 代理集群:多个 ActiveMQ代理 可以组成一个 集群 来提供服务;
 - 异常简单的管理:ActiveMQ 是以开发者思维被设计的。
 
优点
- 跨平台 (JAVA 编写与平台无关,ActiveMQ 几乎可以运行在任何的 JVM 上);
 - 可以用 JDBC:可以将 数据持久化 到数据库。虽然使用 JDBC 会降低 ActiveMQ 的性能,但是数据库一直都是开发人员最熟悉的存储介质;
 - 支持 JMS 规范:支持 JMS 规范提供的 统一接口;
 - 支持 自动重连 和 错误重试机制;
 - 有安全机制:支持基于 shiro,jaas 等多种 安全配置机制,可以对 Queue/Topic 进行 认证和授权;
 - 监控完善:拥有完善的 监控,包括 Web Console,JMX,Shell 命令行,Jolokia 的 RESTful API;
 - 界面友善:提供的 Web Console 可以满足大部分情况,还有很多 第三方的组件 可以使用,比如 hawtio;
 
缺点
- 社区活跃度不及 RabbitMQ 高;
 - 根据其他用户反馈,会出莫名其妙的问题,会 丢失消息;
 - 目前重心放到 activemq 6.0 产品 Apollo,对 5.x 的维护较少;
 - 不适合用于 上千个队列 的应用场景;
 
3.4、RocketMQ
- 简介
RocketMQ 是一个分布式消息和流数据平台,具有低延迟、高性能、高可靠性、万亿级容量和灵活的可扩展性。RocketMQ 是2012年阿里巴巴开源的第三代分布式消息中间件。 - 架构图

 基本术语
- Topic:一个Topic可以有0个、1个、多个生产者向其发送消息,一个生产者也可以同时向不同的Topic发送消息。
 - Tag:消息二级类型,可以为用户提供额外的灵活度,一条消息可以没有tag;
 - Producer:消息生产者;
 - Broker:存储消息,以Topic为纬度轻量级的队列;
 - Consumer:消息消费者,负责接收并消费消息;
 - MessageQueue:消息的物理管理单位,一个Topic可以有多个Queue,Queue的引入实现了水平扩展的能力;
 - NameServer:负责对原数据的管理,包括Topic和路由信息,每个NameServer之间是没有通信的;
 - Group:一个组可以订阅多个Topic,ProducerGroup、ConsumerGroup分别是一类生产者和一类消费者;
 - Offset:通过Offset访问存储单元,RocketMQ中所有消息都是持久化的,且存储单元定长。
 - Consumer:支持PUSH和PULL两种消费模式,支持集群消费和广播消费。
 
优点
- 支持发布/订阅(Pub/Sub)和点对点(P2P)消息模型;
 - 顺序队列:在一个队列中可靠的先进先出(FIFO)和严格的顺序传递;
 - 支持拉(pull)和推(push)两种消息模式;
 - 单一队列百万消息的堆积能力;
 - 支持多种消息协议,如 JMS、MQTT 等;
 - 分布式横向扩展架构
 - 满足至少一次消息传递语义;
 - 提供丰富的Dashboard,包含配置、指标和监控等。
 
缺点
- 支持的客户端语言不多,目前是java及c++,其中c++不成熟
 - 社区活跃度一般
 - 延时消息:开源版不支持任意时间精度,仅支持特定的level
 
3.5、Apache Pulsar
- 简介
Apache Pulsar 是 Apache 软件基金会顶级项目,是下一代云原生分布式消息流平台,集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐、低延时及高可扩展性等流数据存储特性,被看作是云原生时代实时消息流传输、存储和计算最佳解决方案。Pulsar 是一个 pub-sub (发布-订阅)模型的消息队列系统。 - 架构图

 基本术语
- Property:代表租户,每个property都可以代表一个团队、一个功能、一个产品线。
 - Namespace:Pulsar的基本管理单元,在namaspace级别可设置权限、消息TTL、Retention 策略等。
 - Producer:数据生产方,负责创建消息并将消息投递到 Pulsar 中;
 - Consumer:数据消费方,连接到 Pulsar接收消息并进行相应的处理;
 - Broker:无状态Proxy服务,负责接收消息、传递消息、集群负载均衡等操作;
 - BookKeeper:有状态,负责持久化存储消息。
 - ZooKeeper:存储 Pulsar 、 BookKeeper 的元数据,集群配置等信息,负责集群间的协调、服务发现等;
 - Topic:用作从producer到consumer传输消息。
 - Ledger:即Segment,Pulsar底层数据以Ledger的形式存储在BookKeeper上。
 - Fragment : 每个 Ledger 由若干 Fragment 组成。
 
优点
- 灵活扩容
 - 无缝故障恢复
 - 支持延时消息
 - 内置的复制功能,用于跨地域复制如灾备
 - 支持两种消费模型:流(独享模式)、队列(共享模式)
 
- 缺点
暂无 
3.6、综上所述
- Kafka 在于 分布式架构,
 - RabbitMQ 基于 AMQP 协议 来实现,
 - RocketMQ 的思路来源于 Kafka,改成了 主从结构,在 事务性 和 可靠性 方面做了优化。
 - 广泛来说,电商、金融 等对 事务一致性 要求很高的,可以考虑 RabbitMQ 和 RocketMQ,
 - 对 性能要求高 的可考虑 Kafka。
 
4、举例说明
由于仅仅是demo,默认安装部署都会,就不再提了。下面只展示 Kafka 和 RocketMQ 的,其他都大同小异。
4.1、Kafka Demo
在用户添加成功的时候发送消息,将添加的参数作为消息body发送,控制层防止重复提交添加;监听器里接收消息,使用redisson分布式锁防止重复消费,使用ack手动确认,防止消息丢失;将接收到的消息存到刚插入的那行数据中。
4.1.1、部分效果
- 控制台信息

 - 数据库

 
4.1.2、配置文件
- pom.xml
 
<dependencies>
        <!--kafka的依赖-->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <!--分布式锁-->
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson-spring-boot-starter</artifactId>
            <version>3.23.1</version>
        </dependency>
        <!--spring boot 测试-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
        <!--spring web-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!--lombok-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!--hutool 工具类-->
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.8.22</version>
        </dependency>
        <!--mysql 驱动-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>
        <!--mybatis plus-->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>3.5.3.2</version>
        </dependency>
        <!--redis-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
    </dependencies>- application.yml
 
server:
    port: 9200
spring:
    application:
        name: llh-contract    # 注册到eureka上面的应用名称
    # Kafka配置
    kafka:
        bootstrap-servers: 192.168.126.137:9092     # 指定kafka地址,可以多个
        #        bootstrap-servers: 192.168.126.134:9092,192.168.126.135:9092,192.168.126.136:9092     # 指定kafka地址,可以多个
        consumer:
            enable-auto-commit: false   # 关闭自动提交
            group-id: test-consumer-group
            auto-offset-reset: latest      # 从当前时间开始接收
            auto-commit-interval: 100
            # 指定消息key和value的编码方式
            key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        producer:
            # 0表示不进行消息接收是否成功的确认,1表示当Leader接收成功时确认,-1表示Leader和Follower都接收成功时确认
            acks: 1
            # 重试机制
            retries: 1
            # 每次批量发送消息的数量
            batch-size: 16384
            buffer-memory: 3355443
            # 指定消息key和value的编码方式
            key-serializer: org.apache.kafka.common.serialization.StringSerializer
            value-serializer: org.apache.kafka.common.serialization.StringSerializer
    # redis配置
    redis:
        host: 127.0.0.1
        port: 6379
    # 数据源
    datasource:
        driver-class-name: com.mysql.cj.jdbc.Driver
        url: jdbc:mysql://localhost:3306/activiti?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&nullCatalogMeansCurrent=true
        username: root
        password: root
# mybatis plus配置
mybatis-plus:
    mapper-locations: classpath*:mapper/*.xml
    configuration:
        log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
# 日志管理
logging:
    level:
        root: info- KafkaConfig.java
 
package com.llh.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.stereotype.Component;
/**
 * User: lilinhan
 * DateTime: 2023/11/14 16:26
 */
@Component
@Configuration
public class KafkaConfig {
    @Bean("ackContainerFactory")
    public ConcurrentKafkaListenerContainerFactory ackContainerFactory(ConsumerFactory consumerFactory) {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.setConsumerFactory(consumerFactory);
        return factory;
    }
}4.1.3、部分代码
- 控制层:UserController.java
 
package com.llh.controller;
import cn.hutool.crypto.digest.DigestUtil;
import cn.hutool.json.JSONUtil;
import com.llh.domain.User;
import com.llh.service.UserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.TimeUnit;
/**
 * User: lilinhan
 * DateTime: 2023/11/22 16:26
 */
@RestController
@RequestMapping("/user")
public class UserController {
    @Autowired
    RedisTemplate redisTemplate;
    @Autowired
    UserService userService;
    @Autowired
    KafkaTemplate kafkaTemplate;
    @RequestMapping("/save")
    public String save(User user){
        String jsonStr = JSONUtil.toJsonStr(user);
        String md5 = DigestUtil.md5Hex(jsonStr);
        // 重复提交
        Boolean b = redisTemplate.opsForValue().setIfAbsent(md5, md5, 1, TimeUnit.MINUTES);
        if(!b){
            return "一分钟禁止重复提交";
        }
        userService.save(user);
        // 获取自增的id
        String id = user.getId()+ "";
        // 使用kafka发送消息并处理发送结果
        // send参数的key和value值必须要一致!!
        kafkaTemplate.send("test",id,jsonStr).addCallback(new ListenableFutureCallback() {
            @Override
            public void onFailure(Throwable ex) {
                System.err.println("Kafka----发送失败");
            }
            @Override
            public void onSuccess(Object result) {
                System.err.println("Kafka----发送成功:");
            }
        });
        return "添加成功";
    }
}- 监听器:MyListener.java
 
package com.llh.listener;
import cn.hutool.json.JSON;
import cn.hutool.json.JSONUtil;
import com.llh.domain.User;
import com.llh.service.UserService;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import java.io.Serializable;
/**
 * User: lilinhan
 * DateTime: 2023/11/22 16:41
 */
@Component
public class MyListener {
    @Autowired
    RedissonClient redissonClient;
    @Autowired
    UserService userService;
    // 使用redisson分布式锁和ack确认消息
    @KafkaListener(topics = "test",containerFactory = "ackContainerFactory")
    public void msg(ConsumerRecord consumerRecord, Acknowledgment ack){
        Object key = consumerRecord.key();
        // 使用redisson获取锁
        RLock lock = redissonClient.getLock((String) key);
        // 如果成功获取到锁
        if(lock.tryLock()){
            try {
                JSON parse = JSONUtil.parse(consumerRecord.value());
                System.err.println("收到的消息----"+parse);
                // 根据自增id查询数据
                User userDB = userService.getById((Serializable) key);
                System.err.println("刚刚添加的数据----"+userDB);
                userDB.setKafkaMsg(JSONUtil.toJsonStr(parse));
                // 把接收到的json字符串存入mysql
                userService.updateById(userDB);
                // 手动确认消息
                ack.acknowledge();
            }catch (Exception e){
                throw  new RuntimeException(e);
            }finally {
                // 释放锁
                if(lock!=null&&lock.isHeldByCurrentThread()){
                    lock.unlock();
                }
            }
        }else {
            System.err.println("重复消费!!");
        }
    }
}4.2、RocketMQ Demo
同步消息、异步消息、延迟消息、同步有序以及队列消息示例
4.2.1、部分效果
- 控制台打印

 - 可视化界面

 - 订阅主题

 - 消费者

 
4.2.2、配置文件
- pom.xml
 
<dependencies>
        <!-- 单元测试 -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.13.2</version>
            <scope>test</scope>
        </dependency>
        <!--spring web-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!--rocketmq 依赖-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.1</version>
        </dependency>
        <!--lombok-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
    </dependencies>- application.yml
 
server:
    port: 9203
spring:
    application:
        name: llh-mq    # 注册导eureka上面的应用名称
# mq的配置
rocketmq:
    name-server: 127.0.0.1:9876     # 服务地址
    # 生产者
    producer:
        group: test     #生产者组
        retry-times-when-send-failed: 2     # 同步发送
        retry-times-when-send-async-failed: 2       # 异步发送
    # 消费者
    consumer:
        topic: top
        pull-batch-size: 10     # 拉取数量
        group: test4.2.3、部分代码
- 控制层:ProducerController.java
 
package com.llh.controller;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
 * User: lilinhan
 * DateTime: 2023/10/27 8:48
 */
@RestController
@RequestMapping("/producer")
public class ProducerController {
    @Autowired
    RocketMQTemplate rocketMQTemplate;
    @RequestMapping("/put")
    public void put(){
        // 构建消息对象
        Message<String> message = MessageBuilder.withPayload("hello2").build();
        SendResult result = rocketMQTemplate.syncSend("sync", message, 500, 4);
        if(SendStatus.SEND_OK.equals(result.getSendStatus())){
            System.err.println("延迟消息发送成功");
        }
    }
    // 异步发送
    @RequestMapping("/async")
    public void async(){
        // 构建消息对象
        Message<String> message = MessageBuilder.withPayload("hello").build();
        rocketMQTemplate.asyncSend("async", message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                SendStatus sendStatus = sendResult.getSendStatus();
                // 确认消息
                if(SendStatus.SEND_OK.equals(sendStatus)){
                    System.err.println("发送成功!消息----------"+sendStatus.name());
                }else {
                    System.err.println("发送成功!消息----------"+sendStatus.name());
                }
            }
            @Override
            public void onException(Throwable throwable) {
                System.out.println("mq连接失败");
            }
        });
    }
    // 同步发送
    @RequestMapping("/sync")
    public void sync(){
        SendResult result = rocketMQTemplate.syncSend("sync", "hello1");
        if(SendStatus.SEND_OK.equals(result.getSendStatus())){
            System.err.println("发送成功");
        }
        rocketMQTemplate.syncSend("sync","hello2");
        rocketMQTemplate.syncSend("sync","hello3");
        rocketMQTemplate.syncSend("sync","hello4");
        System.err.println("发送成功");
    }
    // 同步发送 有序消息
    @RequestMapping("/sync2")
    public void async2(){
        rocketMQTemplate.syncSendOrderly("sync2", "订单1创建", "order1");
        rocketMQTemplate.syncSendOrderly("sync2", "订单1支付", "order1");
        rocketMQTemplate.syncSendOrderly("sync2", "订单1完成", "order1");
        rocketMQTemplate.syncSendOrderly("sync2", "订单2创建", "order2");
        rocketMQTemplate.syncSendOrderly("sync2", "订单2支付", "order2");
        rocketMQTemplate.syncSendOrderly("sync2", "订单2完成", "order2");
    }
}- 实现RocketMQListener接口的监听器
 
package com.llh.mq;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
 * User: lilinhan
 * DateTime: 2023/10/26 16:58
 */
@Component
@RocketMQMessageListener(topic = "sync",consumerGroup = "sdsd",consumeMode = ConsumeMode.ORDERLY)
public class ConsumerListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.err.println("接受的消息:"+s);
    }
}- 实现MessageListenerOrderly接口的监听器
 
package com.llh.mq;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.PostConstruct;
import java.util.List;
/**
 * User: lilinhan
 * DateTime: 2023/10/27 11:26
 */
@Component
public class TestListener implements MessageListenerOrderly {
    // 初始化mq
    @PostConstruct
    public void init() throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
        consumer.setNamesrvAddr("127.0.0.1:9876");      // nameserv地址
        consumer.subscribe("sync2","*");    // 订阅的主题
        consumer.setConsumerGroup("dsf");       //消费者的组
        consumer.setInstanceName("topname");
        consumer.registerMessageListener(this);   // 监听处理的对象
        consumer.start();
    }
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
        // 判断消息是否为空
        if(CollectionUtils.isEmpty(list)){
            return ConsumeOrderlyStatus.SUCCESS;
        }
        // 遍历消息队列
        for (MessageExt messageExt : list) {
            int queueId = messageExt.getQueueId();
            String body = new String(messageExt.getBody());
            System.err.println("这个队列:"+queueId+"-----消息:"+body+"-------"+Thread.currentThread().getId()+":"+Thread.currentThread().getName());
        }
        // 返回成功
        return ConsumeOrderlyStatus.SUCCESS;
    }
}
                            
                            
139 comments
2025年10月新盘 做第一批吃螃蟹的人coinsrore.com
新车新盘 嘎嘎稳 嘎嘎靠谱coinsrore.com
新车首发,新的一年,只带想赚米的人coinsrore.com
新盘 上车集合 留下 我要发发 立马进裙coinsrore.com
做了几十年的项目 我总结了最好的一个盘(纯干货)coinsrore.com
新车上路,只带前10个人coinsrore.com
新盘首开 新盘首开 征召客户!!!coinsrore.com
新项目准备上线,寻找志同道合 的合作伙伴coinsrore.com
新车即将上线 真正的项目,期待你的参与coinsrore.com
新盘新项目,不再等待,现在就是最佳上车机会!coinsrore.com
新盘新盘 这个月刚上新盘 新车第一个吃螃蟹!coinsrore.com
2025年10月新盘 做第一批吃螃蟹的人coinsrore.com
新车新盘 嘎嘎稳 嘎嘎靠谱coinsrore.com
新车首发,新的一年,只带想赚米的人coinsrore.com
新盘 上车集合 留下 我要发发 立马进裙coinsrore.com
做了几十年的项目 我总结了最好的一个盘(纯干货)coinsrore.com
新车上路,只带前10个人coinsrore.com
新盘首开 新盘首开 征召客户!!!coinsrore.com
新项目准备上线,寻找志同道合 的合作伙伴coinsrore.com
新车即将上线 真正的项目,期待你的参与coinsrore.com
新盘新项目,不再等待,现在就是最佳上车机会!coinsrore.com
新盘新盘 这个月刚上新盘 新车第一个吃螃蟹!coinsrore.com
2025年10月新盘 做第一批吃螃蟹的人coinsrore.com
新车新盘 嘎嘎稳 嘎嘎靠谱coinsrore.com
新车首发,新的一年,只带想赚米的人coinsrore.com
新盘 上车集合 留下 我要发发 立马进裙coinsrore.com
做了几十年的项目 我总结了最好的一个盘(纯干货)coinsrore.com
新车上路,只带前10个人coinsrore.com
新盘首开 新盘首开 征召客户!!!coinsrore.com
新项目准备上线,寻找志同道合 的合作伙伴coinsrore.com
新车即将上线 真正的项目,期待你的参与coinsrore.com
新盘新项目,不再等待,现在就是最佳上车机会!coinsrore.com
新盘新盘 这个月刚上新盘 新车第一个吃螃蟹!coinsrore.com
2025年10月新盘 做第一批吃螃蟹的人coinsrore.com
新车新盘 嘎嘎稳 嘎嘎靠谱coinsrore.com
新车首发,新的一年,只带想赚米的人coinsrore.com
新盘 上车集合 留下 我要发发 立马进裙coinsrore.com
做了几十年的项目 我总结了最好的一个盘(纯干货)coinsrore.com
新车上路,只带前10个人coinsrore.com
新盘首开 新盘首开 征召客户!!!coinsrore.com
新项目准备上线,寻找志同道合 的合作伙伴coinsrore.com
新车即将上线 真正的项目,期待你的参与coinsrore.com
新盘新项目,不再等待,现在就是最佳上车机会!coinsrore.com
新盘新盘 这个月刚上新盘 新车第一个吃螃蟹!coinsrore.com
2025年10月新盘 做第一批吃螃蟹的人coinsrore.com
新车新盘 嘎嘎稳 嘎嘎靠谱coinsrore.com
新车首发,新的一年,只带想赚米的人coinsrore.com
新盘 上车集合 留下 我要发发 立马进裙coinsrore.com
做了几十年的项目 我总结了最好的一个盘(纯干货)coinsrore.com
新车上路,只带前10个人coinsrore.com
新盘首开 新盘首开 征召客户!!!coinsrore.com
新项目准备上线,寻找志同道合 的合作伙伴coinsrore.com
新车即将上线 真正的项目,期待你的参与coinsrore.com
新盘新项目,不再等待,现在就是最佳上车机会!coinsrore.com
新盘新盘 这个月刚上新盘 新车第一个吃螃蟹!coinsrore.com
2025年10月新盘 做第一批吃螃蟹的人coinsrore.com
新车新盘 嘎嘎稳 嘎嘎靠谱coinsrore.com
新车首发,新的一年,只带想赚米的人coinsrore.com
新盘 上车集合 留下 我要发发 立马进裙coinsrore.com
做了几十年的项目 我总结了最好的一个盘(纯干货)coinsrore.com
新车上路,只带前10个人coinsrore.com
新盘首开 新盘首开 征召客户!!!coinsrore.com
新项目准备上线,寻找志同道合 的合作伙伴coinsrore.com
新车即将上线 真正的项目,期待你的参与coinsrore.com
新盘新项目,不再等待,现在就是最佳上车机会!coinsrore.com
新盘新盘 这个月刚上新盘 新车第一个吃螃蟹!coinsrore.com
新项目准备上线,寻找志同道合的合作伙伴
新车上路,只带前10个人
新项目准备上线,寻找志同道合的合作伙伴
新盘新盘 这个月刚上新盘 新车第一个吃螃蟹!
新车即将上线 真正的项目,期待你的参与
新车上路,只带前10个人
新车即将上线 真正的项目,期待你的参与
新车首发,新的一年,只带想赚米的人coinsrore.com
交叉点
地平线
有趣页面
白头神探智斗灭世狂人
猫步2回归的猫
交叉点