RabbitMQ整合springboot

springboot整合RabitMQ

引入依赖

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

编写连接信息

1
2
3
4
5
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/rabbitmq_study
spring.rabbitmq.username=zyz
spring.rabbitmq.password=2824199842

helloword模型

生产者生产消息,消费者一直等待消息到来。

添加配置类MqConfig

1
2
3
4
5
6
7
8
9
10
11
12
@Configuration
public class MqConfig {

/**
* 创建 hello队列 并交给容器管理
* @return
*/
@Bean
public Queue helloQueue(){
return new Queue("hello");
}
}

生产者

1
2
3
4
5
6
7
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void testHello() {
// 发布消息
rabbitTemplate.convertAndSend("hello","hello,world");
}

消费者

1
2
3
4
5
6
7
8
@Component
public class HelloConsumer {

@RabbitListener(queues = "hello")
public void receive(String message){
System.out.println("Message:"+message);
}
}

work模型

多个消费者绑定同一个队列,共同消费其中的消息。解决消息积压问题。

配置类MqConfig中添加work队列

1
2
3
4
@Bean
public Queue workQueue(){
return new Queue("work");
}

生产者:

1
2
3
4
5
6
@Test 
void testWork(){
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("work","hello,workQueue");
}
}

消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Component
public class WorkConsumer {

@RabbitListener(queues ="work")
public void receive1(String message){
System.out.println("WorkConsumer1接收消息:"+message);
}

@RabbitListener(queues = "work")
public void receive2(String message){
System.out.println("WorkConsumer2接收消息:"+message);
}
}

Publish/Subscribe Fanout模型

广播模型,每个消费者绑定不同的队列,每个队列绑定在交换机上,生产者生产消息给交换机,交换机负责消息的分发。实现一条消息被多个消费者消费。

在配置类MqConfig中添加

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
 // 注入队列fanout1
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout1");
}
// 注入队列fanout2
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout2");
}
// 注入交换机fanout.ex
@Bean
public FanoutExchange fanoutEx(){
return new FanoutExchange("fanout.ex");
}

// 绑定队列fanout1和交换机fanout.ex
@Bean
public Binding fanout1ToEx(){
return BindingBuilder.bind(fanoutQueue1()).to(fanoutEx());
}

// 绑定队列fanout2和交换机fanout.ex
@Bean
public Binding fanout2ToEx(){
return BindingBuilder.bind(fanoutQueue2()).to(fanoutEx());
}

生产者:

1
2
3
4
 @Test
void testFanout(){
rabbitTemplate.convertAndSend("fanout.ex","","hello,fanout");
}

消费者:

1
2
3
4
5
6
7
8
9
@RabbitListener(queues = "fanout1")
public void handleFanout1(String msg){
System.out.println("消费者1:"+msg);
}

@RabbitListener(queues = "fanout2")
public void handleFanout2(String msg){
System.out.println("消费者2:"+msg);
}

Publish/Subscribe Routing-Direct模型

让不同的消息被不同的队列消费

在配置类MqConfig中添加

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Bean
public Queue directQueue1(){
return new Queue("directQueue1");
}

@Bean
public Queue directQueue2(){
return new Queue("directQueue2");
}

@Bean
public DirectExchange directExchange(){
return new DirectExchange("routing.direct.ex");
}

@Bean
public Binding directQueue1ToExWithLog(){
return BindingBuilder.bind(directQueue1()).to(directExchange()).with("log");
}

@Bean
public Binding directQueue2ToExWithUser(){
return BindingBuilder.bind(directQueue2()).to(directExchange()).with("user");
}
@Bean
public Binding directQueue2ToExWithOrder(){
return BindingBuilder.bind(directQueue2()).to(directExchange()).with("order");
}

生产者:

1
2
3
4
5
6
@Test
void testRoutingDirect(){
rabbitTemplate.convertAndSend("routing.direct.ex","log","hello,log");
rabbitTemplate.convertAndSend("routing.direct.ex","user","hello,user");
rabbitTemplate.convertAndSend("routing.direct.ex","order","hello,order");
}

消费者

1
2
3
4
5
6
7
8
9
@RabbitListener(queues = "directQueue1")
public void handleDirectQueue1(String msg){
System.out.println("消费者1:"+msg);
}

@RabbitListener(queues = "directQueue2")
public void handleDirectQueue2(String msg){
System.out.println("消费者2:"+msg);
}

Publish/Subscribe Routing-Topic模型

可使用通配符将交换机与队列绑定

在配置类MqConfig中添加

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Bean
public Queue topicQueue1(){
return new Queue("topicQueue1");
}

@Bean
public Queue topicQueue2(){
return new Queue("topicQueue2");
}

@Bean
public TopicExchange topicExchange(){
return new TopicExchange("routing.topic.ex");
}


@Bean
public Binding topicQueue1ToExWithUser(){
return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("user.*");
}

@Bean
public Binding topicQueue2ToExWithUser(){
return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("user.#");
}

生产者:

1
2
3
4
5
6
@Test
void testRoutingTopic(){
rabbitTemplate.convertAndSend("routing.topic.ex","user","hello,user");
rabbitTemplate.convertAndSend("routing.topic.ex","user.info","hello,user.info");
rabbitTemplate.convertAndSend("routing.topic.ex","user.info.crud","hello,user.info.crud");
}

消费者:

1
2
3
4
5
6
7
8
9
@RabbitListener(queues = "topicQueue1")
public void handleTopicQueue1(String msg){
System.out.println("消费者user.*:"+msg);
}

@RabbitListener(queues = "topicQueue2")
public void handleTopicQueue2(String msg){
System.out.println("消费者user.#:"+msg);
}

结果:

1
2
3
4
消费者user.*:hello,user.info
消费者user.#:hello,user
消费者user.#:hello,user.info
消费者user.#:hello,user.info.crud

实际使用

转换为json字符串发送

1
2
3
4
5
6
7
 @Test
void testHello() {
HashMap<String,User> map = new HashMap<>();
User user = new User(111L, "张小红", 21);
map.put("userInfo",user);
rabbitTemplate.convertAndSend("hello", JSON.toJSONString(map));
}

消息监视

在配置文件中添加

1
2
3
4
# 开启消息到达交换机的确认机制
spring.rabbitmq.publisher-confirm-type=correlated
# 开启消息未达到队列的回调
spring.rabbitmq.publisher-returns=true

添加配置类RabbitMsgWatch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Configuration
public class RabbitMsgWatch implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {

private Logger logger = LoggerFactory.getLogger(RabbitMsgWatch.class);
@Autowired
private RabbitTemplate rabbitTemplate;

/**
* 设置监听
*/
@PostConstruct // 后置增强 bean对象创建后开始生效
public void setConfig(){
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}

/**
* 消息到交换机的回调
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
logger.info(JSON.toJSONString(correlationData));
logger.info(JSON.toJSONString(ack));
logger.info(JSON.toJSONString(cause));
}

/**
* 消息未到达队列的回调
* @param returned
*/
@Override
public void returnedMessage(ReturnedMessage returned) {
logger.info(JSON.toJSONString(returned));
}
}

消息签收

消息的签收方式分为,自动签收和手动签收

  • 自动签收

    吞吐量很高,用于数据量比较大,数据不是特别重要的场景,例如日志。消息只要被客户端接收到,无论客户端发生了什么,服务器都会删除这条消息。springboot项目默认开启自动签收。

  • 手动签收

    吞吐量低,用于数据重要的场景,例如操作数据库。在sprboot项目中需要手动来开启。

1
2
3
4
# 开启手动ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 直连交换机开启手动ack
spring.rabbitmq.listener.direct.acknowledge-mode=manual

发送消息时设置消息id

1
2
3
4
5
6
7
rabbitTemplate.convertAndSend("directs", "error", "我是一个重要消息", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message)throws AmqpException {
//自己给消息设置自定义的ID
String messageId= UUID.randomUUID().toString().replace("-",""); message.getMessageProperties().setMessageId(messageId);
return message;
}

消息的签收与拒收

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* 参数说明:
* deliveryTag 消息投递ID,要签收的投递ID是多少
* multiple:是否批量签收
*/
channel.basicAck(deliveryTag,false);

/**
* 参数说明:
* deliveryTag 消息投递ID,要签收的投递ID是多少
* multiple:是否批量签收
* requeue: true 代表拒绝签收并把消息重新放回到队列里面 false就直接拒绝
*/
channel.basicNack(deliveryTag,false,true);

拒收消息,其实是为了保护消息,当消费消息发生异常时,我们可以把消息放在队列里面,重新让别人消费,而不是丢了它!

死循环的处理:

不签收,并且让它回到队列里面,想法很好,但是很容易造成死循环,因为没有任何人能消费它。

我们设计一个机制,当一个消息被消费3次还没有消费成功,我们就直接把它记录下来,人工处理!

消息消费3次(消息的标识,消息的计数)

我们引入Redis ,使用Redis 计数,若超过3次,直接拒绝消息,并且不回到队列里面

重复消费问题

重复消费的情况:消费者消费消息之后,还没来得及确认消费,突然宕机了,此时队列里面的该消息并没有被消费掉,就会发生消息重复消费的问题。

解决重复消费的问题,最终就是要保证消息的幂等性,也就是消费重复消费同一条消息最终得到的结果与消费一条消息一样。

解决方案:
乐观锁机制保证:

1、通过数据库加入版本号比对

2、redis中存入全局唯一id比对

延迟消息

当发送消息到服务器时,不是马上就放到消息队列中,而是在mq服务器里面建立一个定时任务,在服务器到达延时时间后再执行投递的操作。

实现细节:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 延迟队列  
@Bean
public Queue delayQueue(){
HashMap<String, Object> args = new HashMap<>();
// 把一个队列修改为延迟队列
// 消息的最大存活时间
args.put("x-message-ttl",10*1000) ;
// 该队列里面的消息死了,去那个交换机
args.put("x-dead-letter-exchange","DeadLetter.exc");
// 该队列里面的消息死了,去那个交换机, 由那个路由key
args.put("x-dead-letter-routing-key","DeadLetter.key");
return new Queue("delay",true,false,false,args);
}

// 交换机
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange("DeadLetter.exc");
}

// 绑定死信队列
@Bean
public Binding newAndDeadLetterExchange(){
return BindingBuilder.bind(deadlyQueue()).to(deadLetterExchange()).
with("DeadLetter.key"); // 死信路由key
}

// 死信队列
@Bean
public Queue deadlyQueue(){
return new Queue("deadly.queue") ;
}