• 使用交换器和队列
    • #exchangeDeclare 方法详解">#exchangeDeclare 方法详解
      • #exchangeDeclareNoWait">#exchangeDeclareNoWait
      • #exchangeDeclarePassive">#exchangeDeclarePassive
    • #exchangeDelete 方法详解">#exchangeDelete 方法详解
    • #queueDeclare 方法详解">#queueDeclare 方法详解
    • #queueDelete 方法详解">#queueDelete 方法详解
    • #queueBind 方法详解">#queueBind 方法详解
    • #exchangeBind">#exchangeBind
    • #何时创建">#何时创建

    使用交换器和队列

    交换器和队列再 AMQP 中高层面的构建模块,应用程序需确保在使用他们的时候,就已经存在了,在使用之前需要先声明(declare)他们
    创建临时队列:

    1. channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
    2. final String queue = channel.queueDeclare().getQueue();
    3. channel.queueBind(queue, EXCHANGE_NAME, routingKey);
    4. Copied!

    1
    2
    3

    上面创建一个持久化的、绑定类型为 direct 的交换器,同时也创建了一个非持久化的、排他的、自动删除的队列(队列名称由 RabbitMQ 自动生成)。这里交换器和队列没有设置特殊的参数。
    上面展示了:如何使用路由键将队列和交换器绑定起来。上面自动生成的队列具有特性:在在该 Connection 可用,同一个 Connection 的不同 Channel 可共用,在 Connection 断开时,会自动删除该队列。
    想要在应用中共享一个队列,可用创建持久化队列
    创建持久化队列:

    1. channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
    2. channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    3. channel.queueBind(queue, EXCHANGE_NAME, routingKey);
    4. Copied!

    1
    2
    3

    分配一个固定的队列名称,并设置持久化、非排他的、非自动删除的队列。
    生产者和消费者都可以声明一个交换器或则队列,如果尝试声明一个已经存在的交换器或队列(只要声明的参数完全匹配已存在的交换器或队列),RabbitMQ 则什么都不做,直接返回成功。如果参数不匹配则会抛出异常

    #exchangeDeclare 方法详解

    声明创建一个交换器
    该方法有多个重载方法,都是由转调下面这个方法的

    1. public Exchange.DeclareOk exchangeDeclare(String exchange, String type,
    2. boolean durable,
    3. boolean autoDelete,
    4. boolean internal,
    5. Map<String, Object> arguments)
    6. Copied!

    1
    2
    3
    4
    5

    返回 Exchange.DeclareOk 标识成功声明了一个交换器

    • exchange:交换器名称
    • type:交换器类型;常见的有:fanout、direct、topic…com.rabbitmq.client.BuiltinExchangeType 类定义了交换器类型
    • durable:是否持久化持久化将交换器存盘,服务重启时不会丢失相关信息
    • autoDelete:是否自动删除自动删除的前提是:至少有一个队列或则交换器与这个交换器绑定,之后所有与这个交换器绑定的队列或则交换器都与此解绑注意:这里自动删除,不是当连接断开时,自动删除这个交换器。确定是这个意思吗?看 API 描述,当服务器不使用时自动删除
    • internal:是否内置的如果是内置的交换器,客户端程序无法直接发送消息到这个交换器 中,只能通过交换器路由 到交换器这种方式。
    • arguments:其他一些结构化参数第 4 章会讲解

      #exchangeDeclareNoWait

      还有一个与之对应的 noWait 的方法
    1. void exchangeDeclareNoWait(String exchange,
    2. BuiltinExchangeType type,
    3. boolean durable,
    4. boolean autoDelete,
    5. boolean internal,
    6. Map<String, Object> arguments) throws IOException;
    7. Copied!

    1
    2
    3
    4
    5
    6

    nowait 参数是 AMQP 中 Exchange.Declare 命令的参数,意思是不需要服务器返回,注意该参数的返回值是 void。
    考虑一个场景:在声明完一个交换器后(实际服务器还未完成交换器的创建),那么此时客户端就使用这个交换器,必然会发生异常。所以,如果没有特殊的应用场景,不建议使用该方法。

    #exchangeDeclarePassive

    1. Exchange.DeclareOk exchangeDeclarePassive(String name) throws IOException;
    2. Copied!

    1

    用来监测交换器是否存在。如果不存在则抛出异常 404 channel exception,同时 Channel 也会被关闭

    #exchangeDelete 方法详解

    删除交换器

    1. void exchangeDeleteNoWait(String exchange, boolean ifUnused) throws IOException;
    2. Copied!

    1

    • exchange:交换器名称
    • isUnused:设置为 true ,则只有交换器没有被使用时,才被删除。

      #queueDeclare 方法详解

    1. Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
    2. Map<String, Object> arguments) throws IOException;
    3. void queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolean autoDelete,
    4. Map<String, Object> arguments) throws IOException;
    5. Copied!

    1
    2
    3
    4

    • queue:队列名称
    • durable:是否持久化
    • exclusive:是否排他当一个队列被声明为排他队列,该队列 仅对首次声明它 的连接可见,并在连接断开时自动删除。这里需要注意一点:就算是持久化的,一旦连接关闭,这个排他队列也会被自动删除。适合场景:使用一个客户端同时发送和读取消息
    • autoDelete:是否自动删除与交换器定义一致;
    • arguments:设置队列的其他一些参数如 x-message-ttl、x-expires、x-max-length、x-max-length-bytes、x-dead-letter-exchange、x-dead-letter-routing-key、x-max-priority

    注意
    生产者和消费者都可以使用 queueDeclare 声明一个队列,但是如果消费者已经在该 Channel 上订阅了另外一个队列,则无法再声明队列了,需要先取消订阅,然后将 Channel 设置为「传输」模式后,才能声明队列

    #queueDelete 方法详解

    1. Queue.DeleteOk queueDelete(String queue, boolean ifUnused, boolean ifEmpty) throws IOException;
    2. Copied!

    1

    前面与交换器一直,ifEmpty 则表示当队列为空(队列里没有消息)时才能删除。

    1. // 清空队列中的内容,不是删除队列本身
    2. Queue.PurgeOk queuePurge(String queue) throws IOException;
    3. Copied!

    1
    2

    #queueBind 方法详解

    1. Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
    2. Copied!

    1

    • queue:队列名
    • exchange:交换器名称
    • routingKey:用来绑定队列和交换器的路由键
    • arguments:定义绑定的一些参数

    还可以进行解绑

    1. Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
    2. Copied!

    1

    #exchangeBind

    将交换器与交换器绑定

    1. Exchange.BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
    2. Copied!

    1

    绑定之后,消息从 source 交换器转发到 destination 交换器。示例代码如下

    1. channel.exchangeDeclare("source", "direct", false, true, null);
    2. channel.exchangeDeclare("destination", "fanout", false, true, null);
    3. channel.exchangeBind("destination", "source", "exKey");
    4. channel.queueDeclare("queue", false, false, true, null);
    5. channel.basicPublish("source", "exKey", null, "exToExDemo".getBytes());
    6. Copied!

    1
    2
    3
    4
    5

    image.png
    其实把交换器当成了一个中间转发。
    解绑交换器与交换器的绑定

    1. Exchange.UnbindOk exchangeUnbind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
    2. Copied!

    1

    #何时创建

    RabbitMQ 的消息存储在队列中,交换器的使用并不真正耗费服务器的性能,而队列会。如果要衡量 RabbitMQ 当前的 QPS,只需要看队列即可。在实际业务应用中,需要对锁创建的队列的流量、内存占用及网卡占用有一个清晰的认知,预估平均值和峰值,以便在固定硬件资源的情况下进行合理有效的分配。
    按照 RabbitMQ· 官方建议,生产者和消费者都应该尝试创建(声明操作)队列。如果你的架构设计能保证队列再使用时就已经被创建好了,则不用遵循该建议。