登录
  • 欢迎访问悠扬的技术博客,推荐使用最新版火狐浏览器和Chrome浏览器访问本网站😉

消息中间件——RabbitMQ(五)快速入门生产者与消费者,SpringBoot整合RabbitMQ!(转)

RabbitMQ 悠扬 522次浏览 已收录 0个评论

消息中间件——RabbitMQ(五)快速入门生产者与消费者,SpringBoot整合RabbitMQ!(转)

文章地址:https://www.cnblogs.com/coder-programming/p/11383341.html

前言

本章我们来一次快速入门RabbitMQ——生产者与消费者。需要构建一个生产端与消费端的模型。什么意思呢?我们的生产者发送一条消息,投递到RabbitMQ集群也就是Broker。
我们的消费端进行监听RabbitMQ,当发现队列中有消息后,就进行消费。

1. 环境准备

本次整合主要采用SpringBoot框架,需要对SpringBoot的使用有一定了解。

2.大概步骤

我们来看下大概步骤:

  • ConnectionFacorty:获取连接工厂
  • Connection:一个连接
  • Channel:数据通信信道,可发送和接收消息
  • Queue:具体的消息存储队列
  • Producer & Consumer 生产者和消费者

这个连接工厂需要配置一些相应的信息,例如: RabbitMQ节点的地址,端口号,VirtualHost等等。
Channel是我们RabbitMQ所有消息进行交互的关键。

3. 项目实战

3.1 连接工厂


/**
 * 
* @ClassName: ConnectionUtils 
* @Description: 连接工具类
* @author Coder编程
* @date 2019年6月21日 上午22:28:22 
*
 */
public class ConnectionUtils {
    public static Connection getConnection() throws IOException, TimeoutException {
        //定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置服务地址
        factory.setHost("127.0.0.1");
        //端口
        factory.setPort(5672);//amqp协议 端口 类似与mysql的3306
        //设置账号信息,用户名、密码、vhost
        factory.setVirtualHost("/vhost_cp");
        factory.setUsername("user_cp");
        factory.setPassword("123456");
        // 通过工程获取连接
        Connection connection = factory.newConnection();
        return connection;
    }
}

3.2 生产端


/**
 * 
* @ClassName: Producer 
* @Description: 生产者
* @author Coder编程
* @date 2019年7月30日 上午21:04:43 
*
 */
public class Producer {

	
	public static void main(String[] args) throws Exception {
	
		System.out.println("Producer start...");
		
		//1 创建ConnectionFactory
		Connection connection = ConnectionUtils.getConnection();
		
		//2 通过connection创建一个Channel
		Channel channel = connection.createChannel();
		
		//3 通过Channel发送数据
for(int i=0; i < 5; i++){
			String msg = "Hello RabbitMQ!";
			//1 exchange   2 routingKey 
			channel.basicPublish("", "test001", null, msg.getBytes());
		}

		//4 记得要关闭相关的连接
		channel.close();
		connection.close();
	}
}



3.3 消费端


/**
 * 
* @ClassName: Consumer 
* @Description: 消费端
* @author Coder编程
* @date 2019年7月30日 上午21:08:12 
*
 */
public class Consumer {

	public static void main(String[] args) throws Exception {
		
		System.out.println("Consumer start...");
		
		//1 创建ConnectionFactory
		Connection connection = ConnectionUtils.getConnection();
		
		//2通过connection创建一个Channel
		Channel channel = connection.createChannel();
		
		//3声明(创建)一个队列
		String queueName = "test001";
		channel.queueDeclare(queueName, true, false, false, null);
		
		//4创建消费者
		QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
		
		//5设置Channel
		channel.basicConsume(queueName, true, queueingConsumer);
		
		while(true){
			//6 获取消息
			Delivery delivery = queueingConsumer.nextDelivery();
			String msg = new String(delivery.getBody());
			System.err.println("消费端: " + msg);
			//Envelope envelope = delivery.getEnvelope();
		}
		
	}
}




3.4 源码解析


channel.queueDeclare(queueName, true, false, false, null);

消息中间件——RabbitMQ(五)快速入门生产者与消费者,SpringBoot整合RabbitMQ!(转)

第一个参数:queuename:队列的名称
第二个参数:durable 是否持久化。true消息会持久化到本地,保证重启服务后消息不会丢失
第三个参数:exclusive :表示独占方式,设置为true 在某些情景下有必要,例如:顺序消费。表示只有一个channel可以去监听,其他channel都不能够监听。目的就是为了保证顺序消费。
第四个参数:autoDelete:队列如果与Exchange未绑定,则自动删除
第五个参数:arguments:扩展参数


channel.basicConsume(QUEUE_NAME, true, consumer);

第二个参数 autoAck:自动签收消息

3.5 运行程序

(1)启动消费端
消息中间件——RabbitMQ(五)快速入门生产者与消费者,SpringBoot整合RabbitMQ!(转)

(2)查看管控台
消息中间件——RabbitMQ(五)快速入门生产者与消费者,SpringBoot整合RabbitMQ!(转)

可以看到已经有一个连接,一个信道,一个消费者等信息了。

消息中间件——RabbitMQ(五)快速入门生产者与消费者,SpringBoot整合RabbitMQ!(转)

消息中间件——RabbitMQ(五)快速入门生产者与消费者,SpringBoot整合RabbitMQ!(转)

可以看到信道目前的状态是空闲状态。

消息中间件——RabbitMQ(五)快速入门生产者与消费者,SpringBoot整合RabbitMQ!(转)

队列中多了test001队列。

关于管控台的介绍可以看这篇文章:消息中间件——RabbitMQ(四)命令行与管控台的基本操作!

(3)运行生产端

消息中间件——RabbitMQ(五)快速入门生产者与消费者,SpringBoot整合RabbitMQ!(转)

消息中间件——RabbitMQ(五)快速入门生产者与消费者,SpringBoot整合RabbitMQ!(转)

可以看到生产端发送完消息之后停下了,消费端迅速接收到了消息。也可以继续通过管控台观察消费的情况。

(4) 问题

注意:

这里面可能有一个问题:为什么要先启动消费端呢?

因为在消费端创建的队列,我们必须要有队列,才能够发送消息。

另一个问题:在生产端代码中:


channel.basicPublish("", "test001", null, msg.getBytes());

并没有设置exchange,只设置了队列名称,消费端却依然能够消费到消息,这是为什么呢?

答:发消息的一定要指定Exchange,如果不指定Exchange或者Exchange为空的话,它会默认走第一个

消息中间件——RabbitMQ(五)快速入门生产者与消费者,SpringBoot整合RabbitMQ!(转)
它的路由规则:将相同命名的队列Queue的消息路由过去,如果路由不过去,将会把消息删除。

 


版权所有丨如未注明 , 均为原创丨本网站采用BY-NC-SA协议进行授权 , 转载请注明消息中间件——RabbitMQ(五)快速入门生产者与消费者,SpringBoot整合RabbitMQ!(转)
喜欢 (0)
支付宝[]
分享 (0)
悠扬
关于作者:
10年以上工作经验,从事2年微服务架构搭建工作,有大数据处理相关工作经验,使用spring全家桶包括:Spring,SpringBoot,SpringCloud 数据层组件服务使用SpringDataJpa,Mybatis以及其他第三方组件Sharding-JDBC,Sharding-Proxy分库分表。熟悉微服务,服务降级,限流,分流,做过项目源码修改,有cat,apollo,nacos使用经验,有Lostash,Elasticsearch,kibana,mysqlMHA生产实践经验,使用开源代码Apache Sarding项目,修改源码支持mysql分库分表使用年月日小时分库分表,docker做集群服务,Jekins做项目发布,GitLab做项目管理,使用docker容器部署,熟悉消息队列RabbitMQ,Kafka,ActiveMQ。RuoYi-Vue-Atomikos项目开源加入生态圈组件,项目支持分布式事务,界面添加多数据源,数据源动态配置,切面切换,多数据源事务支持,支持区域数据源配置,用于区域数据切分,数据层次分库。项目地址:https://gitee.com/zsiyang/ruoyi-vue-atomikos
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址