学习rabbitMQ

学习rabbitMQ

介绍:

(12条消息) windows环境下安装RabbitMQ(超详细)_luckySnow-julyo的博客-CSDN博客_windows安装rabbitmq

其他框架

Kafka、RabbitMQ、RocketMQ 全方位对比 - 龘人上天 - 博客园 (cnblogs.com)

整合步骤:

  1. 下载rabbitMQ,解压
  2. maven依赖
  3. 添加config类(生产者消费者应该不同项目)
  4. 启动rabbitMQ Rabbitmq的启动和停止 - .未央 - 博客园 (cnblogs.com)
  5. 访问 http://localhost:15672 usernname: guest password: guest

其他

1
2
3
4
5
6
7
8
9
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package com.jcDemo.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* @author: gao_quansui
* @user:ASUS
* @date:2022/9/27 - 14:15
* @projectName:jc_demo
*/
@Configuration
public class DirectRabbitMQConfig {
// 创建队列
@Bean
public Queue TestDirectQueue() {
return new Queue("DirectQueue", true);
}

@Bean
public Queue TopicQueue() {
return new Queue("TopicQueue", true);
}

@Bean Queue FanoutQueue1(){
return new Queue("Fanout.Queue1");
}

@Bean Queue FanoutQueue2(){
return new Queue("Fanout.Queue2");
}

// 创建交换机
@Bean
DirectExchange DirectExchange() {
return new DirectExchange("DirectExchange", true, false);
}

@Bean
TopicExchange TopicExchange(){
return new TopicExchange("TopicExchange",true,false);
}

@Bean
FanoutExchange FanoutExchange(){
return new FanoutExchange("FanoutExchange",true,false);
}

// 建立连接绑定
// 直连为一个连接,即一对一
@Bean
Binding bindingDirect() {
return BindingBuilder.bind(TestDirectQueue()).to(DirectExchange()).with("DirectRoutingKey");
}

// 队列可以绑定多个生产者
@Bean
Binding bindingTopic(){
return BindingBuilder.bind(TopicQueue()).to(TopicExchange()).with("TopicRoutingKey.#");
// 匹配多个路由,使得消费者可以收到多个生产者的消息
}

// 交换机可以绑定多个队列(广播模式) (没有路由key)
@Bean
Binding bindingFanout1(){
return BindingBuilder.bind(FanoutQueue1()).to(FanoutExchange());
}

@Bean
Binding bindingFanout2(){
return BindingBuilder.bind(FanoutQueue2()).to(FanoutExchange());
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package com.jcDemo.controller.mq;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

/**
* @author: gao_quansui
* @user:ASUS
* @date:2022/9/27 - 14:18
* @projectName:jc_demo
*/
@RestController
public class MQTest {

//使用RabbitTemplate,这提供了接收/发送等等方法
@Autowired
RabbitTemplate rabbitTemplate;

@GetMapping("/sendDirectMessage")
public String sendDirectMessage() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "direct producer message, hello!";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> map = new HashMap<>();
map.put("messageId", messageId);
map.put("messageData", messageData);
map.put("createTime", createTime);
//将消息携带绑定键值
rabbitTemplate.convertAndSend("DirectExchange", "DirectRoutingKey", map);
return "ok";
}

@GetMapping("/sendTopicMessage1")
public String sendTopicMessage1() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "topic producer1 message, hello!";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> map = new HashMap<>();
map.put("messageId", messageId);
map.put("messageData", messageData);
map.put("createTime", createTime);
//将消息携带绑定键值
rabbitTemplate.convertAndSend("TopicExchange", "TopicRoutingKey.test1", map);
return "ok";
}

@GetMapping("/sendTopicMessage2")
public String sendTopicMessage2() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "topic producer2 message, hello!";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> map = new HashMap<>();
map.put("messageId", messageId);
map.put("messageData", messageData);
map.put("createTime", createTime);
//将消息携带绑定键值
rabbitTemplate.convertAndSend("TopicExchange", "TopicRoutingKey.test2", map);
return "ok";
}

@GetMapping("/sendFanoutMessage")
public String sendFanoutMessage() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "fanout producer message, hello!";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> map = new HashMap<>();
map.put("messageId", messageId);
map.put("messageData", messageData);
map.put("createTime", createTime);
rabbitTemplate.convertAndSend("FanoutExchange","",map);
return "ok";
}
}

消费者-Direct

1
2
3
4
5
6
7
8
9
10
@Slf4j
@Component
@RabbitListener(queues = "DirectQueue")
public class ConsumerDirect {

@RabbitHandler
public void process(Map testMessage) {
log.info("DirectQueue的消费者收到消息{}",testMessage.toString());
}
}

消费者-Topic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.rabbit_consumer.mq;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.annotation.RabbitListeners;
import org.springframework.stereotype.Component;

import javax.naming.Binding;
import java.util.Map;

/**
* @author: gao_quansui
* @user:ASUS
* @date:2022/10/9 - 9:59
* @projectName:rabbit_consumer
*/

@Slf4j
@Component
@RabbitListener(queues = "TopicQueue")
// 该队列对应的生产者有两个, 能够收到两个生产者的消息,不同于直连的一一对应关系
public class ConsumerTopic {

@RabbitHandler
public void process(Map map){
log.info("TopicQueue received:{}",map.toString());
}
}

消费者-Fanout

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.rabbit_consumer.mq;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
* @author: gao_quansui
* @user:ASUS
* @date:2022/10/9 - 13:45
* @projectName:rabbit_consumer
*/

@Slf4j
@Component
public class ConsumerFanout {

@RabbitListener(queues = "Fanout.Queue1")
// @RabbitHandler
public void process1(Map map) {
log.info("Consumer1 receive from FanoutQueue:{}", map);
}

@RabbitListener(queues = "Fanout.Queue2")
// @RabbitHandler
public void process(Map map){
log.info("Consumer2 receive from FanoutQueue:{}",map);
}
}

总结

  1. 三种模式

    1. Direct 直连 一一对应
    2. Topic 主题连接 利用*,# 进行匹配 一个交换机对应多个生产者
      • )只能向后匹配一个单词 即 a. → a.a(可以) a.a.a(不可以)
      • (#)只能向后匹配多个单词 即 a.* → a.a(可以) a.a.a(可以)
    3. Fanout 广播模式 一个交换机分发到多个队列,队列对应到相应的消费者
  2. 疑问:

    1. 能否实现一个生产者,一个交换机,一个队列,分发多个消费者?