引入依赖
1 | <dependency> |
编写连接信息
1 | 127.0.0.1 = |
helloword模型
生产者生产消息,消费者一直等待消息到来。
添加配置类MqConfig
1 |
|
生产者
1 |
|
消费者
1 |
|
work模型
多个消费者绑定同一个队列,共同消费其中的消息。解决消息积压问题。
配置类MqConfig中添加work队列
1 |
|
生产者:
1 |
|
消费者:
1 |
|
Publish/Subscribe Fanout模型
广播模型,每个消费者绑定不同的队列,每个队列绑定在交换机上,生产者生产消息给交换机,交换机负责消息的分发。实现一条消息被多个消费者消费。
在配置类MqConfig中添加
1 | // 注入队列fanout1 |
生产者:
1 |
|
消费者:
1 | "fanout1") (queues = |
Publish/Subscribe Routing-Direct模型
让不同的消息被不同的队列消费
在配置类MqConfig中添加
1 |
|
生产者:
1 |
|
消费者
1 | "directQueue1") (queues = |
Publish/Subscribe Routing-Topic模型
可使用通配符将交换机与队列绑定
在配置类MqConfig中添加
1 |
|
生产者:
1 |
|
消费者:
1 | "topicQueue1") (queues = |
结果:
1 | 消费者user.*:hello,user.info |
实际使用
转换为json字符串发送
1 |
|
消息监视
在配置文件中添加
1 | # 开启消息到达交换机的确认机制 |
添加配置类RabbitMsgWatch
1 |
|
消息签收
消息的签收方式分为,自动签收和手动签收
自动签收
吞吐量很高,用于数据量比较大,数据不是特别重要的场景,例如日志。消息只要被客户端接收到,无论客户端发生了什么,服务器都会删除这条消息。springboot项目默认开启自动签收。
手动签收
吞吐量低,用于数据重要的场景,例如操作数据库。在sprboot项目中需要手动来开启。
1 | # 开启手动ack |
发送消息时设置消息id
1 | rabbitTemplate.convertAndSend("directs", "error", "我是一个重要消息", new MessagePostProcessor() { |
消息的签收与拒收
1 | /** |
拒收消息,其实是为了保护消息,当消费消息发生异常时,我们可以把消息放在队列里面,重新让别人消费,而不是丢了它!
死循环的处理:
不签收,并且让它回到队列里面,想法很好,但是很容易造成死循环,因为没有任何人能消费它。
我们设计一个机制,当一个消息被消费3次还没有消费成功,我们就直接把它记录下来,人工处理!
消息消费3次(消息的标识,消息的计数)
我们引入Redis ,使用Redis 计数,若超过3次,直接拒绝消息,并且不回到队列里面
重复消费问题
重复消费的情况:消费者消费消息之后,还没来得及确认消费,突然宕机了,此时队列里面的该消息并没有被消费掉,就会发生消息重复消费的问题。
解决重复消费的问题,最终就是要保证消息的幂等性,也就是消费重复消费同一条消息最终得到的结果与消费一条消息一样。
解决方案:
乐观锁机制保证:
1、通过数据库加入版本号比对
2、redis中存入全局唯一id比对
延迟消息
当发送消息到服务器时,不是马上就放到消息队列中,而是在mq服务器里面建立一个定时任务,在服务器到达延时时间后再执行投递的操作。
实现细节:
1 | // 延迟队列 |