RabbitMQ 的快速使用
docker部署rabbitmq
# management才有管理页面
docker pull rabbitmq:management
# 新建容器并运行
docker run
-e RABBITMQ_DEFAULT_USER=admin
-e RABBITMQ_DEFAULT_PASS=admin
-v mq-plugins:/plugins
--name mq
--hostname mq
-p 15672:15672
-p 5672:5672
-itd
rabbitmq:management
# 查看运行状态
docker ps -a
导入RabbitMQ依赖
pom.xml
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- 消息转换器需要用到的Jackson -->
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
提供者和消费者的配置
application.yml
spring:
rabbitmq:
host: 192.168.137.139 # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: admin # 用户名
password: admin # 密码
消息转换器
提供者和消费者都可以添加
RabbitMQMessageConverterConfig.java
@Configuration
public class RabbitMQMessageConverterConfig {
@Bean
public static MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
}
注意执行顺序
:需要先使用consumer监听并创建队列(需要保证队列存在!),publisher再往里面添加队列才会有用,否则白添加队列
SimpleQueue
提供者: SimpleQueuePublisher.java
@RunWith(SpringRunner.class)
@SpringBootTest
public class SimpleQueuePublisher {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void simpleQueueTest() {
String msg = "hello, simple queue";
rabbitTemplate.convertAndSend("simple.queue", msg);
}
}
消费者:SimpleQueueConsumer.java
@Component
public class SimpleQueueConsumer {
@RabbitListener(queues = "simple.queue")
public void simpleQueueConsumer(String msg) {
System.out.println("simpleQueueConsumer: " + msg);
}
}
Work Queues
可加配置
application.yml
spring:
rabbitmq:
listener:
simple: # simple类型
prefetch: 1 # consumer每次执行预取的数量
消费者:WorkQueueConsumer.java
@Component
public class WorkQueueConsumer {
@RabbitListener(queuesToDeclare = {@Queue(name = "work.queue")})
public void workQueue1Consumer(String msg) throws InterruptedException {
System.out.println("workQueue1Consumer: " + msg);
Thread.sleep(10);
}
@RabbitListener(queuesToDeclare = {@Queue(name = "work.queue")})
public void workQueue2Consumer(String msg) throws InterruptedException {
System.out.println("workQueue2Consumer: " + msg);
Thread.sleep(90);
}
}
提供者: WorkQueuePublisher.java
@RunWith(SpringRunner.class)
@SpringBootTest
public class WorkQueuePublisher {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void workQueueTest() {
for (int i = 1; i <= 100; i ++ ) {
String msg = "hello, work queue. " + i;
rabbitTemplate.convertAndSend("work.queue", msg);
}
}
}
发布/订阅
Fanout
消费者:FanoutQueueConsumer.java
@Exchange
和@Queue
注解中的declare
属性默认为"true"
,如果不存在会自动创建exchange和queue。
@Component
public class FanoutQueueConsumer {
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(name = "fanout", type = ExchangeTypes.FANOUT),
value = @Queue(name = "fanout.queue1")
))
public void fanoutQueue1Consumer(String msg) {
System.out.println("fanoutQueue1Consumer: " + msg);
}
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(name = "fanout", type = ExchangeTypes.FANOUT),
value = @Queue(name = "fanout.queue2")
))
public void fanoutQueue2Consumer(String msg) {
System.out.println("fanoutQueue2Consumer: " + msg);
}
}
提供者: FanoutQueuePublisher.java
@RunWith(SpringRunner.class)
@SpringBootTest
public class FanoutQueuePublisher {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void fanoutQueueTest() {
String exchangeName = "fanout";
String msg = "hello, fanout queue.";
rabbitTemplate.convertAndSend(exchangeName, "", msg);
}
}
Routing / Direct
消费者:DirectQueueConsumer.java
@Component
public class DirectQueueConsumer {
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(name = "direct", type = ExchangeTypes.DIRECT),
value = @Queue(name = "direct.queue1")
))
public void directQueue1Consumer(String msg) {
System.out.println("directQueue1Consumer: " + msg);
}
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(name = "direct", type = ExchangeTypes.DIRECT),
value = @Queue(name = "direct.queue2")
))
public void directQueue2Consumer(String msg) {
System.out.println("directQueue2Consumer: " + msg);
}
}
提供者: DirectQueuePublisher.java
@RunWith(SpringRunner.class)
@SpringBootTest
public class DirectQueuePublisher {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void directQueueTest() {
String exchangeName = "direct";
String key = "error";
// String key = "warning";
String msg = "hello, direct queue, " + key;
rabbitTemplate.convertAndSend(exchangeName, key, msg);
}
}
Topics
*: 通配一个单词
和#: 通配多个单词
消费者:TopicQueueConsumer.java
@Configuration
public class TopicQueueConsumer {
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(name = "topic", type = ExchangeTypes.TOPIC),
value = @Queue(name = "topic.queue1"),
key = {"*.orange.*"}
))
public void topicQueue1Consumer(String msg) {
System.out.println("topicQueue1Consumer: " + msg);
}
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(name = "topic", type = ExchangeTypes.TOPIC),
value = @Queue(name = "topic.queue2"),
key = {"*.*.rabbit", "lazy.#"}
))
public void topicQueue2Consumer(String msg) {
System.out.println("topicQueue2Consumer: " + msg);
}
}
提供者: TopicQueuePublisher.java
@RunWith(SpringRunner.class)
@SpringBootTest
public class TopicQueuePublisher {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void topicQueueTest() {
String exchangeName = "topic";
String key = "lazy.orange.rabbit";
String msg = "hello, topic queue. " + key;
rabbitTemplate.convertAndSend(exchangeName, key, msg);
}
}