登录
  • 欢迎访问悠扬的技术博客,推荐使用最新版火狐浏览器和Chrome浏览器访问本网站😉

消息中间件——RabbitMQ(十一)RabbitMQ整合Spring Cloud Stream实战!(全) (转)

RabbitMQ 悠扬 1406次浏览 已收录

消息中间件——RabbitMQ(十一)RabbitMQ整合Spring Cloud Stream实战!(全) (转)

文章地址

前言

1. SpringCloudStream整合

  • Spring Cloud,这个全家桶框架在整个中小型互联网公司异常的火爆,那么相对应的Spring Cloud Stream 就渐渐的被大家所重视起来,这一节课主要来介绍Spring Cloud Stream如何与RabbitMQ进行集成。

架构介绍

消息中间件——RabbitMQ(十一)RabbitMQ整合Spring Cloud Stream实战!(全) (转)
架构图

Destination Binder:包含自己的应用Application
左边是RabbitMQ、右边是Kafka。表示消息的生产与发送,可以是不同的消息中间件。这是Spring Cloud Stream 最上层的一个抽象。非常好的地方。

消息中间件——RabbitMQ(十一)RabbitMQ整合Spring Cloud Stream实战!(全) (转)
Spring Cloud Stream核心架构1

两个比较重要的地方:inputs(输入)消息接收端、outputs(输出)消息发送端

Application Core:核心应用
Binder:作为“协调者”角色
Middleware:

消息中间件——RabbitMQ(十一)RabbitMQ整合Spring Cloud Stream实战!(全) (转)
Spring Cloud Stream核心架构2

黄色:表示RabbitMQ
绿色:插件,消息的输入输出都套了一层插件,插件可以用于各种各样不同的消息,也可以用于消息中间件的替换。

核心概念:
Barista接口:Barista接口是定义来作为后面类的参数,这一接口定义来通道类型和通道名称,通道名称是作为配置用,通道类型则决定了app会使用这一通道进行发送消息还是从中接收消息。

通道接口如何定义:
@Output:输出注解,用于定义发送消息接口
@Input:输入注解,用于定义消息的消费者接口
@StreamListener:用于定义监听方法的注解

使用Spring Cloud Stream 非常简单,只需要使用好这3个注解即可,在实现高性能消息的生产和消费的场景非常合适,但是使用SpringCloudStream框架有一个非常大的问题,就是不能实现可靠性的投递,也就是没法保证消息的100%可靠性,会存在少量消息丢失的问题。目前SpringCloudStream整合了RabbitMQ与Kafka,我们都知道Kafka是无法进行消息可靠性投递的,这个原因是因为SpringCloudStream框架为了和Kafka兼顾所以在实际工作中使用它的目的就是针对高性能的消息通信的!这点就是在当前版本SpringCloudStream的定位。
因此在实际的工作中,可以采用SpringCloudStream,如果需要保证可靠性投递,也可以单独采用RabbitMQ,也是可以的。

2. 代码演示

2.1 生产端

2.1.1 创建rabbitmq-springcloudstream-producer工程

1、pom.xml引入依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.cp</groupId>
    <artifactId>rabbitmq-springcloudstream-producer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>rabbitmq-springcloudstream-producer</name>
    <description>rabbitmq-spring</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.8.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>   
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-autoconfigure</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            <version>1.3.4.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

2、定义Barista接口

/**
 * 这里的Barista接口是定义来作为后面类的参数,这一接口定义来通道类型和通道名称。
 * 通道名称是作为配置用,通道类型则决定了app会使用这一通道进行发送消息还是从中接收消息。
 */
public interface Barista {

    String OUTPUT_CHANNEL = "output_channel";  
    //注解@Output声明了它是一个输出类型的通道,名字是output_channel。这一名字与app1中通道名一致,表明注入了一个名字为output_channel的通道,类型是output,发布的主题名为mydest。  
    @Output(Barista.OUTPUT_CHANNEL)
    MessageChannel logoutput();  
}  

3、定义RabbitmqSender类

//启动这个绑定
@EnableBinding(Barista.class)
@Service //注入到spring容器  
public class RabbitmqSender {  

    //注入Barista
    @Autowired  
    private Barista barista;  

    // 发送消息
    public String sendMessage(Object message, Map<String, Object> properties) throws Exception {  
        try{
            MessageHeaders mhs = new MessageHeaders(properties);
            Message msg = MessageBuilder.createMessage(message, mhs);
            boolean sendStatus = barista.logoutput().send(msg);
            System.err.println("--------------sending -------------------");
            System.out.println("发送数据:" + message + ",sendStatus: " + sendStatus);
        }catch (Exception e){  
            System.err.println("-------------error-------------");
            e.printStackTrace();
            throw new RuntimeException(e.getMessage());

        }  
        return null;
    }  

}  

4、application.properties

server.port=8001
server.servlet.context-path=/producer

spring.application.name=producer
spring.cloud.stream.bindings.output_channel.destination=exchange-3
## group相当于RabbitMQ中Queue的名称
spring.cloud.stream.bindings.output_channel.group=queue-3
##以下为集群环境配置,rabbit_cluster与下面的spring.cloud.stream.binders.rabbit_cluster是对应的。
spring.cloud.stream.bindings.output_channel.binder=rabbit_cluster

spring.cloud.stream.binders.rabbit_cluster.type=rabbit
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.addresses=localhost:5672
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.username=user_cp
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.password=123456
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.virtual-host=/vhost_cp

2.1 消费端

1、pom.xml引入依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.cp</groupId>
    <artifactId>rabbitmq-springcloudstream-consumer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>rabbitmq-springcloudstream-consumer</name>
    <description>rabbitmq-spring</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.8.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>   
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>       
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            <version>1.3.4.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

2、定义Barista接口

/**
 * 这里的Barista接口是定义来作为后面类的参数,这一接口定义来通道类型和通道名称。
 * 通道名称是作为配置用,通道类型则决定了app会使用这一通道进行发送消息还是从中接收消息。
 */
public interface Barista {

    String INPUT_CHANNEL = "input_channel";  

    //注解@Input声明了它是一个输入类型的通道,名字是Barista.INPUT_CHANNEL,也就是position3的input_channel。这一名字与上述配置app2的配置文件中position1应该一致,表明注入了一个名字叫做input_channel的通道,它的类型是input,订阅的主题是position2处声明的mydest这个主题  
    @Input(Barista.INPUT_CHANNEL)  
    SubscribableChannel loginput();  


}  

3、定义RabbitmqReceiver类

//启动binding
@EnableBinding(Barista.class)
@Service
public class RabbitmqReceiver {  


    @StreamListener(Barista.INPUT_CHANNEL)  
    public void receiver(Message message) throws Exception {  

        //手工签收必须要有channel与deliveryTag
        Channel channel = (com.rabbitmq.client.Channel) message.getHeaders().get(AmqpHeaders.CHANNEL);
        Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
        System.out.println("Input Stream 1 接受数据:" + message);
        System.out.println("消费完毕------------");
        //批量签收设置为false
        channel.basicAck(deliveryTag, false);
    }  
}

4、application.properties

server.port=8002
server.context-path=/consumer

spring.application.name=consumer
spring.cloud.stream.bindings.input_channel.destination=exchange-3
spring.cloud.stream.bindings.input_channel.group=queue-3
spring.cloud.stream.bindings.input_channel.binder=rabbit_cluster
##默认监听数
spring.cloud.stream.bindings.input_channel.consumer.concurrency=1
##针对消费端channel进行设置,是否支持requeue,重回队列
spring.cloud.stream.rabbit.bindings.input_channel.consumer.requeue-rejected=false
##是否支持签收,签收模式:手工签收
spring.cloud.stream.rabbit.bindings.input_channel.consumer.acknowledge-mode=MANUAL
##服务重连
spring.cloud.stream.rabbit.bindings.input_channel.consumer.recovery-interval=3000
##是否持久化订阅
spring.cloud.stream.rabbit.bindings.input_channel.consumer.durable-subscription=true


##最大监听数
spring.cloud.stream.rabbit.bindings.input_channel.consumer.max-concurrency=5

##采用rabbitmq方式,也可以采用kafka
spring.cloud.stream.binders.rabbit_cluster.type=rabbit
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.addresses=localhost:5672
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.username=user_cp
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.password=123456
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.virtual-host=/vhost_cp

3. 运行测试

3.1 运行消费端

启动项目后,查看管控台是否生成了Exchange与Queue

消息中间件——RabbitMQ(十一)RabbitMQ整合Spring Cloud Stream实战!(全) (转)
Exchange

 

消息中间件——RabbitMQ(十一)RabbitMQ整合Spring Cloud Stream实战!(全) (转)
Queue

启动项目后,SpringCloudStream生成了Exchange与Queue。

3.2 运行生产端测试代码

@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {

    @Autowired
    private RabbitmqSender rabbitmqSender;


    @Test
    public void sendMessageTest1() throws InterruptedException {
       for(int i = 0; i < 1; i ++){
           try {
               Map<String, Object> properties = new HashMap<String, Object>();
               properties.put("SERIAL_NUMBER""12345");
               properties.put("BANK_NUMBER""abc");
               properties.put("PLAT_SEND_TIME", DateUtils.formatDate(new Date(), "yyyy-MM-dd HH:mm:ss.SSS"));
               rabbitmqSender.sendMessage("Hello, I am amqp sender num :" + i, properties);

           } catch (Exception e) {
               System.out.println("--------error-------");
               e.printStackTrace(); 
           }
       }
       TimeUnit.SECONDS.sleep(2000);
    }

}

运行测试方法:sendMessageTest1(),查看打印结果。

生产端打印结果:

消息中间件——RabbitMQ(十一)RabbitMQ整合Spring Cloud Stream实战!(全) (转)
生产端打印结果

消费端打印结果:

消息中间件——RabbitMQ(十一)RabbitMQ整合Spring Cloud Stream实战!(全) (转)
消费端打印结果

小结

通过这几章,我们学习了Spring AMQP的相关知识,通过学习,我们队RabbitMQ继承Spring有了一个深入的认识,这样为我们后续的学习、工作使用都打下了坚实的基础,最后我们整合了SpringBoot与SpringCloudStream,更方便更高效的集成到我们的应用服务中去!
学习了三个大块,学习了RabbitMQ,消息的声明、模板消息、SimpleMessageListenerContainer、MessageListenerAdapter、MessageConverter。支持String的转换器、Json转换器、Java对象的转换、多个对象转换、流式转换(图片、文件)。与SpringBoot整合非常非常的简化,只需要几步配置就能够发送和接收消息。SpringCloudStream本质上多添加一个代理层input与output。为了以后MQ替换了,通道级别的代码可以不做修改。生产端可以是RabbitMQ,消费端是Kafka。


版权所有丨如未注明 , 均为原创丨本网站采用BY-NC-SA协议进行授权 , 转载请注明消息中间件——RabbitMQ(十一)RabbitMQ整合Spring Cloud Stream实战!(全) (转)
喜欢 (0)
支付宝[]
分享 (0)
悠扬
关于作者:
10年以上工作经验:6年以上微服务架构设计搭建经验。 曾任岗位:项目经理、架构师。 擅长领域:大数据、数据库,架构设计,资源优化。 获得业绩: 1.实用新型发明专利1个,修改Apache Sharding源码设计实现分库分表程序增强方案。 2.开源项目一个:https://gitee.com/zsiyang/ruoyi-vue-atomikos (加入开源生态圈)。 3.个人技术博客地址:https://www.nxhz1688.com