SpringAMQP

消息队列是实现异步通讯的一种方式,我们将从RabbitMQ为例开始介绍SpringAMQT。

RabbitMQ

RabbitMQ是基于Erlang语言开发的开源消息通信中间件。

安装与部署

由于RabbitMQ运行需要安装Erlang,为了方便部署,我们采用docker的方式来部署RabbitMq

首先拉取RabbitMQ的镜像,带有management的Tag的说明该镜像含有Web控制台

docker pull rabbitmq:3-management

执行下面的命令来运行RabbitMQ容器

docker run 
-e RABBITMQ_DEFAULT_USER=username 
-e RABBITMQ_DEFAULT_PASS=password 
--name mq 
-p 15672:15672 
-p 5672:5672 
-d 
rabbitmq:3-management

username和password是进入RabbitMQ控制台时使用的账号密码,15672端口是控制台所占用的端口,5672是MQ服务所占用的端口。

RabbitMQ结构

  • channel: 操作MQ的工具
  • exchange: 路由消息到队列中
  • queue: 缓存消息
  • virtual host:虚拟主机,是对queue、exchange等资源的逻辑分组
    在这里插入图片描述

简单队列模型

消息发布者

@SpringBootTest
class PublisherApplicationTests {

    @Test
    void publisher() throws IOException, TimeoutException {
        //建立连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.111.135");
        factory.setPort(5672);
        factory.setUsername("username");
        factory.setPassword("password");
        factory.setVirtualHost("/");
        Connection connection=factory.newConnection();
        //创建通道
        Channel channel=connection.createChannel();
        //创建队列
        String queueName="simple.queue";
        channel.queueDeclare(queueName,false,false,false,null);
        //发布消息
        String message="hello,rabbitmq!";
        for (int i = 0; i < 100; i++) {
            channel.basicPublish("",queueName,null,message.getBytes());
        }

        channel.close();
        connection.close();
    }

消息消费者

public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //建立连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.111.135");
        factory.setPort(5672);
        factory.setUsername("username");
        factory.setPassword("password");
        factory.setVirtualHost("/");
        Connection connection=factory.newConnection();
        //建立通道
        Channel channel=connection.createChannel();
        //消费端也创建队列是为了防止消费端先启动找不到队列
        String queueName="simple.queue";
        channel.queueDeclare(queueName,false,false,false,null);

        //为通道绑定消费者
        channel.basicConsume(queueName,true,new DefaultConsumer(channel){

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消息已被处理");
            }
        });
    }
}

其他的模型将在SpringAMPQ中进行介绍

SpringAMQP

AMQP (Adavance Message Queuing Protocol 高级消息队列协议)是用于在应用程序或之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。

Spring AMQP 是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两个部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现

SpringAMQP的特点:

  • 提供监听容器用于异步处理入站消息
  • 提供RabbitTemplate用于发送和接收消息
  • 提供RabbitAdmin来自动声明队列,交换机和绑定

依赖引入

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置RabbitMQ连接信息

spring:
  rabbitmq:
    host: 192.168.111.135
    port: 5672
    username: username
    password: password
    virtual-host: /

基本模型

简单队列模型

消息发布端

@SpringBootTest
class PublisherApplicationTests {
    @Autowired
    RabbitTemplate rabbitTemplate;

    @Test
    void publisher(){

        String queueName="simple.queue";
        String message="hello,rabbitmq!";
        rabbitTemplate.convertSendAndReceive(queueName,message);
    }

}

消息消费端

@Component
public class RabbitListenerTest {
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage(String message){
        System.out.println(message);
    }
}

WorkQueue模型

模型特点:多个消费者绑定到一个队列,同一条消息只会被一个消费者处理。

在默认情况下,消费者会进行消息预取,预取的数量为无限大,这会导致性能不同的消费者处理相同数量的消息,可以通过设置prefetch来控制消费者预取的消息数量

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1

发布订阅模型

发布订阅模式允许将同一消息发送给多个消费者。实现方式是加入了exchange(交换机)

常见的exchange类型有:

  • Fanout: 广播
  • Direct: 路由
  • Topic: 话题

exchange只负责消息路由而不进行存储,路由失败则消息丢失

发布订阅模型分三部

  1. 在consumer服务中声明Exchange、Queue、Binding
  2. 在consumer服务中声明多个消费者
  3. 在publisher服务发送消息到Exchange

FanoutExchange

声明Exchange、Queue、Binding。除了通过在配置类中通过@Bean注解绑定队列和交换机外还可以在@RabbitListener注解中绑定,第二种绑定方式将在下一部分使用

@Configuration
public class FanoutConfig {
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("simple.fanout");
    }
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }

    @Bean
    public Binding bingingQueue1(Queue fanoutQueue1,FanoutExchange exchange){
        return BindingBuilder.bind(fanoutQueue1).to(exchange);
    }
    @Bean
    public Binding bingingQueue2(Queue fanoutQueue2,FanoutExchange exchange){
        return BindingBuilder.bind(fanoutQueue2).to(exchange);
    }
}

声明消费者与简单模型没什么差别,就不赘述了。

publisher端将消息发送给交换机有一点小区别

@SpringBootTest
class PublisherApplicationTests {
    @Autowired
    RabbitTemplate rabbitTemplate;

    @Test
    void publisher(){

        String exchangeName="simple.fanout";
        String message="hello,rabbitmq!";
        //三个参数分别为交换机名、routingkey和消息
        rabbitTemplate.convertSendAndReceive(exchangeName,"",message);
    }

}

DirectExchange

在上一种交换机中发送消息的第二个参数routingkey我们设置为了空,实际上,每一个Queue与Exchange间都可以设定多个bindingkey,通过routingkey参数,交换机将把消息路由到与其匹配的队列中。

@Component
public class RabbitListenerTest {
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "simple.queue1"),
            exchange = @Exchange(name = "simple.direct",type = ExchangeTypes.DIRECT),
            key = {"red","blue"}
    ))
    public void listenSimpleQueueMessage(String message){
        System.out.println(message);
    }
}

TopicExchange

TopicExchange与DirectExchange类似,区别是:

  • TopicExchange的routingkey必须是多个单词的列表,并且以英文句号.分隔。
  • bindingkey支持使用通配符,#匹配零或多个单词,*匹配一个单词。

通过以下代码,你很容易可以发现它的特点

@SpringBootTest
class PublisherApplicationTests {
    @Autowired
    RabbitTemplate rabbitTemplate;

    @Test
    void publisher(){

        String exchangeName="simple.fanout";
        String message="hello,rabbitmq!";
        rabbitTemplate.convertSendAndReceive(exchangeName,"china.weather",message);
    }

}
@Component
public class RabbitListenerTest {
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "simple.queue1"),
            exchange = @Exchange(name = "simple.direct",type = ExchangeTypes.Topic),
            key = "china.*"
    ))
    public void listenSimpleQueueMessage(String message){
        System.out.println(message);
    }
}

消息转换器

我们在使用RabbitTemplate的时候可以发现,它所发送的消息的类型为Object,这意味着它可以发送所有对象。默认它所使用的是jdk的序列化,这样的效率较低。

Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的,只需定义一个MessageConverter类型的Bean即可修改序列化方式。

以jackson的序列化为例

引入jackson的依赖

<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>

定义MessageConverter类型的Bean

@Bean
public MessageConverter jsonMessageConverter(){
    return new Jackson2JsonMessageConverter();
}