登录
  • 欢迎访问悠扬的技术博客,推荐使用最新版火狐浏览器和Chrome浏览器访问本网站😉

消息中间件——RabbitMQ(十)RabbitMQ整合SpringBoot实战!(全)(转)

RabbitMQ 悠扬 656次浏览 已收录 0个评论

消息中间件——RabbitMQ(十)RabbitMQ整合SpringBoot实战!(全)(转)

文章地址

前言

1. SpringBoot整合配置详解

  • publisher-confirms,实现一个监听器用于监听Broker端给我们返回的确认请求:RabbitTemplate.ConfirmCallback

  • publisher-returns,保证消息对Broker端是可达的,如果出现路由键不可达的情况,则使用监听器对不可达的消息进行后续的处理,保证消息的路由成功:RabbitTemplate.ReturnCallback

注意一点,在发送消息的时候对template进行配置mandatory=true保证监听有效
生产端还可以配置其他属性,比如发送重试,超时时间,次数,间隔等

2. 代码演示

2.1 生产端

2.1.1 新建项目springboot-producer

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.cp</groupId>
    <artifactId>springboot-producer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>springboot-producer</name>
    <description>springboot-producer</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.4.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

RabbitSender.java 消息生产者

@Component
public class RabbitSender {

    //自动注入RabbitTemplate模板类
    @Autowired
    private RabbitTemplate rabbitTemplate;  

    //回调函数: confirm确认
    final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            System.err.println("correlationData: " + correlationData);
            System.err.println("ack: " + ack);
            if(!ack){
                //可以进行日志记录、异常处理、补偿处理等
                System.err.println("异常处理....");
            }else {
                //更新数据库,可靠性投递机制
            }
        }
    };

    //回调函数: return返回
    final ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
        @Override
        public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText,
                String exchange, String routingKey) {
            System.err.println("return exchange: " + exchange + ", routingKey: " 
                + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
        }
    };

    //发送消息方法调用: 构建Message消息
    public void send(Object message, Map<String, Object> properties) throws Exception {
        MessageHeaders mhs = new MessageHeaders(properties);
        Message msg = MessageBuilder.createMessage(message, mhs);
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);
        //id + 时间戳 全局唯一  用于ack保证唯一一条消息,这边做测试写死一个。但是在做补偿策略的时候,必须保证这是全局唯一的消息
        CorrelationData correlationData = new CorrelationData("1234567890");
        rabbitTemplate.convertAndSend("exchange-1""springboot.abc", msg, correlationData);
    }

}

application.properties

spring.rabbitmq.addresses=localhost:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/vhost_cp
spring.rabbitmq.connection-timeout=15000

spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true

2.1.2 操作管控台

添加Exchange

消息中间件——RabbitMQ(十)RabbitMQ整合SpringBoot实战!(全)(转)
Exchange-1
消息中间件——RabbitMQ(十)RabbitMQ整合SpringBoot实战!(全)(转)
添加成功

添加Queue

消息中间件——RabbitMQ(十)RabbitMQ整合SpringBoot实战!(全)(转)
Queue-1

Exchange绑定Queue

消息中间件——RabbitMQ(十)RabbitMQ整合SpringBoot实战!(全)(转)
绑定
消息中间件——RabbitMQ(十)RabbitMQ整合SpringBoot实战!(全)(转)
绑定成功

修改routingKey,springboot改为spring,则进入的是returnCallback方法

消息中间件——RabbitMQ(十)RabbitMQ整合SpringBoot实战!(全)(转)
returnCallback打印日志-错误

这时候我们发现报错了

correlationData: CorrelationData [id=1234567890]
ack: false
异常处理....
消息中间件——RabbitMQ(十)RabbitMQ整合SpringBoot实战!(全)(转)
returnCallback打印日志-正确

2.1.3 解决ack为false问题

这是由于我们在测试方法中进行测试,当测试方法结束,rabbitmq相关的资源也就关闭了,虽然我们的消息发送出去,但异步的ConfirmCallback却由于资源关闭而出现了上面的问题。
加入Thread.sleep()即可解决。

@Test
public void testSender1() throws Exception {
     Map<String, Object> properties = new HashMap<>();
     properties.put("number""12345");
     properties.put("send_time", simpleDateFormat.format(new Date()));
     rabbitSender.send("Hello RabbitMQ For Spring Boot!", properties);
     Thread.sleep(2000);
}

2.2 消费端

消费端核心配置:

签收模式-手工签收

spring.rabbitmq.listener.simple.acknowledge-mode=manual

设置监听限制:最大10,默认5

spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=10

  • 首先配置手工确认模式,用于ACK的手工处理,这样我们可以保证消息的可靠性送达,或者再消费端消费失败的时候可以做到重回队列(不建议)、根据业务记录日志等处理。

  • 可以设置消费端的监听个数和最大个数,用于监控消费端的并发情况

@RabbitListener注解使用

  • 消费端监听@RabbitListener注解,这个对于在实际工作中非常的好用

  • @RabbitListener是一个组合注解,里面可以注解配置

  • @QueueBinding、@Queue、@Exchange直接通过这个组合注解一次性搞定消费端交换机、队列、绑定、路由、并且配置监听功能等。

消息中间件——RabbitMQ(十)RabbitMQ整合SpringBoot实战!(全)(转)
@RabbitListener注解

比如在方法onMessage上加@RabbitListener注解,同时需要加另外一个注解@RabbitHandler,代码被消费者监听。

建立绑定,在Value上写上队列,设置Exchange,是否持久化,设置Exchange的类型、表达式设置为true以及路由key。通过这种简单的方式,就可以完成之前很复杂的代码逻辑。同时建议将配置放入到配置文件中,动态获取。如果mq中没有相应的队列、Exchange等,注解声明也可以创建它们,大家可以自行测试!

2.2.1 新建项目springboot-consumer

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.cp</groupId>
    <artifactId>springboot-consumer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>springboot-consumer</name>
    <description>springboot-consumer</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

RabbitReceiver.java 消息生产者

@Component
public class RabbitReceiver {


    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "queue-1", 
            durable="true"),
            exchange = @Exchange(value = "exchange-1", 
            durable="true", 
            type= "topic", 
            ignoreDeclarationExceptions = "true"),
            key = "springboot.*"
            )
    )
    @RabbitHandler
    public void onMessage(Message message, Channel channel) throws Exception {
        System.err.println("--------------------------------------");
        System.err.println("消费端Payload: " + message.getPayload());
        Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
        //手工ACK,获取deliveryTag
        channel.basicAck(deliveryTag, false);
    }
}

application.properties

spring.rabbitmq.addresses=localhost:5672
spring.rabbitmq.username=user_cp
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=/vhost_cp
spring.rabbitmq.connection-timeout=15000

spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=10

运行Application,查看之前在生产端发送的消息,是否能被消费。

打印结果

消息中间件——RabbitMQ(十)RabbitMQ整合SpringBoot实战!(全)(转)
打印结果

这里之前由于我测试的时候多发了消息,所以消费的时候会有这么多。

3. 优化代码

  • 自定义Java对象消息

  • @RabbitListener注解中的配置改为动态配置

@Payload:指定具体的消息体Body。
@Headers: 获取Headers。

3.1 消费端优化

1、先定义一个Order对象

public class Order implements Serializable {

    private String id;
    private String name;

    public Order() {
    }
    public Order(String id, String name) {
        super();
        this.id = id;
        this.name = name;
    }
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
}

注意:我们在传输对象的时候,必须序列化。否则会传输失败。

2、RabbitReceiver添加监听

/**
     * 
     *  spring.rabbitmq.listener.order.queue.name=queue-2
        spring.rabbitmq.listener.order.queue.durable=true
        spring.rabbitmq.listener.order.exchange.name=exchange-2
        spring.rabbitmq.listener.order.exchange.durable=true
        spring.rabbitmq.listener.order.exchange.type=topic
        spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true
        spring.rabbitmq.listener.order.key=springboot.*
     * @param order
     * @param channel
     * @param headers
     * @throws Exception
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}", 
            durable="${spring.rabbitmq.listener.order.queue.durable}"),
            exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}", 
            durable="${spring.rabbitmq.listener.order.exchange.durable}", 
            type= "${spring.rabbitmq.listener.order.exchange.type}", 
            ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),
            key = "${spring.rabbitmq.listener.order.key}"
            )
    )
    @RabbitHandler
    public void onOrderMessage(@Payload com.cp.springboot.entity.Order order, 
            Channel channel, 
            @Headers Map<String, Object> headers) throws Exception {
        System.err.println("--------------------------------------");
        System.err.println("消费端order: " + order.getId());
        Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
        //手工ACK
        channel.basicAck(deliveryTag, false);
    }

已经将配置写入到了application.properties中,进行动态获取。也可以像我们公司一样放入到配置中心当中。例如:携程开源配置中心Apollo

3、application.properties

spring.rabbitmq.listener.order.queue.name=queue-2
spring.rabbitmq.listener.order.queue.durable=true
spring.rabbitmq.listener.order.exchange.name=exchange-2
spring.rabbitmq.listener.order.exchange.durable=true
spring.rabbitmq.listener.order.exchange.type=topic
spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true
spring.rabbitmq.listener.order.key=springboot.*

3.2 生产端优化

1、同样是一个Order对象,必须跟消费端的保持一致。

2、RabbitSender添加发送消息

//发送消息方法调用: 构建自定义对象消息
public void sendOrder(Order order) throws Exception {
    rabbitTemplate.setConfirmCallback(confirmCallback);
    rabbitTemplate.setReturnCallback(returnCallback);
    //id + 时间戳 全局唯一 
    CorrelationData correlationData = new CorrelationData("0987654321");
    rabbitTemplate.convertAndSend("exchange-2""springboot.def", order, correlationData);
}

3、添加测试方法

@Test
public void testSender2() throws Exception {
     Order order = new Order("001""第一个订单");
     rabbitSender.sendOrder(order);
     //防止资源提前关闭,ConfirmCallback异步回调失败
     Thread.sleep(2000);
}

4.测试

运行testSender2()方法。

生产端打印消息

消息中间件——RabbitMQ(十)RabbitMQ整合SpringBoot实战!(全)(转)
生产端打印消息

消费端打印消息

消息中间件——RabbitMQ(十)RabbitMQ整合SpringBoot实战!(全)(转)
消费端打印消息

至此,RabbitMQ整合SpringBoot完毕,在实际工作中,使用场景也是差不多的。


版权所有丨如未注明 , 均为原创丨本网站采用BY-NC-SA协议进行授权 , 转载请注明消息中间件——RabbitMQ(十)RabbitMQ整合SpringBoot实战!(全)(转)
喜欢 (0)
支付宝[]
分享 (0)
悠扬
关于作者:
10年以上工作经验,从事2年微服务架构搭建工作,有大数据处理相关工作经验,使用spring全家桶包括:Spring,SpringBoot,SpringCloud 数据层组件服务使用SpringDataJpa,Mybatis以及其他第三方组件Sharding-JDBC,Sharding-Proxy分库分表。熟悉微服务,服务降级,限流,分流,做过项目源码修改,有cat,apollo,nacos使用经验,有Lostash,Elasticsearch,kibana,mysqlMHA生产实践经验,使用开源代码Apache Sarding项目,修改源码支持mysql分库分表使用年月日小时分库分表,docker做集群服务,Jekins做项目发布,GitLab做项目管理,使用docker容器部署,熟悉消息队列RabbitMQ,Kafka,ActiveMQ。RuoYi-Vue-Atomikos项目开源加入生态圈组件,项目支持分布式事务,界面添加多数据源,数据源动态配置,切面切换,多数据源事务支持,支持区域数据源配置,用于区域数据切分,数据层次分库。项目地址:https://gitee.com/zsiyang/ruoyi-vue-atomikos
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址