本文档基于 spring AMQP 2.2.1RELEASE REFERENCE 文档

与Spring框架和相关项目提供的许多其他高级抽象一样,Spring AMQP提供了一个"Template",该Template起着核心作用。定义的主要操作接口为AmqpTemplate。这些操作只涵盖了发送和接收消息的一般行为。换句话说,它们并不是任何实现所特有的,从名称中就带有"AMQP"就能看出来。另一方面,该接口的实现与AMQP协议的实现绑定在一起。与JMS(本身是接口级API)不同,AMQP是线级(wire-level)协议。该协议的实现都提供了自己的客户端库,因此模板接口的每种实现都取决于特定的客户端库。当前,只有一个实现:RabbitTemplate。在下面的示例中,我们经常使用AmqpTemplate。但是,当您查看配置示例或实例化模板或调用设置程序的任何代码摘录时,您会看到实现类型(例如,RabbitTemplate)。
像之前提到的,AmqpTemplate 接口定义了所有发送和接收消息的基本操作。我们会浏览消息的发送和接收在Sending MessagesReceiving Messages两节.

添加重试功能(Adding Retry Capabilities)

从1.3版开始,您现在可以将RabbitTemplate配置为使用RetryTemplate来帮助处理broker连接性问题。 有关完整信息,请参见spring-retry项目。 以下仅是一个使用指数退避策略和默认SimpleRetryPolicy的示例,该示例在将异常抛出给调用方之前进行了三次尝试。

以下示例使用XML名称空间:

<rabbit:template id="template" connection-factory="connectionFactory" retry-template="retryTemplate"/>

<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
    <property name="backOffPolicy">
        <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
            <property name="initialInterval" value="500" />
            <property name="multiplier" value="10.0" />
            <property name="maxInterval" value="10000" />
        </bean>
    </property>
</bean>

以下是Java中使用@Configuration注解的示例:

@Bean
public AmqpTemplate rabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(connectionFactory());
    RetryTemplate retryTemplate = new RetryTemplate();
    ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
    backOffPolicy.setInitialInterval(500);
    backOffPolicy.setMultiplier(10.0);
    backOffPolicy.setMaxInterval(10000);
    retryTemplate.setBackOffPolicy(backOffPolicy);
    template.setRetryTemplate(retryTemplate);
    return template;
}

从1.4版本开始,除了retryTemplate属性外,RabbitTemplate还支持recoveryCallback参数。该参数用作RetryTemplate.execute(RetryCallback<T, E> retryCallback, RecoveryCallback<T> recoveryCallback)的第二个参数.

retryTemplate.execute(
    new RetryCallback<Object, Exception>() {

        @Override
        public Object doWithRetry(RetryContext context) throws Exception {
            context.setAttribute("message", message);
            return rabbitTemplate.convertAndSend(exchange, routingKey, message);
        }

    }, new RecoveryCallback<Object>() {

        @Override
        public Object recover(RetryContext context) throws Exception {
            Object message = context.getAttribute("message");
            Throwable t = context.getLastThrowable();
            // Do something with message
            return null;
        }
    });
}

在这种情况下,您不应该将RetryTemplate注入RabbitTemplate中。

生产是异步的—如何检测成功和失败(Publishing is Asynchronous — How to Detect Successes and Failures)

发布消息是一种异步机制,默认情况下,RabbitMQ会删除无法路由的消息。 为了成功发布,您将会收到异步的确认,如Publisher Confirms and Returns所述。 考虑两种故障情况:

  • 发布到exchange ,但没有匹配的目标队列。
  • 发布到不存在的exchange。

第一种情况在生产者返回中说明了,具体见 Publisher Confirms and Returns

对于第二种情况,该消息将被丢弃,并且不会生成返回。 地层的通道将被关闭并抛出一个异常。 默认情况下,此异常会有日志,您也可以向CachingConnectionFactory注册ChannelListener以获得此类事件的通知。 以下示例显示如何添加ConnectionListener:

this.connectionFactory.addConnectionListener(new ConnectionListener() {

    @Override
    public void onCreate(Connection connection) {
    }

    @Override
    public void onShutDown(ShutdownSignalException signal) {
        ...
    }

});

您可以检查signal的reason属性以确定发生的原因。
要检测在发送线程上的异常,可以在RabbitTemplate上设置ChannelTransacted(true)并在txCommit方法上检测到异常。 但是,transactions会严重影响性能,如果只是一个用例的话,请仔细考虑性能下降(impede performance)这一点。

生产者确认和返回(Publisher Confirms and Returns)

RabbitTemplate实现了AmqpTemplate,支持生产者确认并返回。

对于返回的messages,templates的 mandatory 属性必须设置为true,或者对于一些特殊的messages必须将expired-expression评估为true。 此功能要求CachingConnectionFactory的PublisherReturns属性设置为true(请参阅Publisher Confirms and Returns)。 通过调用setReturnCallback(ReturnCallback callback)注册RabbitTemplate.ReturnCallback,将返回值发送到客户端。 回调必须实现以下方法:

void returnedMessage(Message message, int replyCode, String replyText,
          String exchange, String routingKey);

每个RabbitTemplate只支持一个 ReturnCallback。

对于生产者确认(publisher confirms,也称为publisher acknowledgements),template 需要一个CachingConnectionFactory,并将其的PublisherConfirm属性设置为ConfirmType.CORRELATED。 通过调用setConfirmCallback(ConfirmCallback callback)注册RabbitTemplate.ConfirmCallback,将确认发送到客户端。 回调必须实现此方法:

void confirm(CorrelationData correlationData, boolean ack, String cause);

CorrelationData是客户端在发送原始消息时提供的对象。 ack(确认接收)对应于ack字段的值是true,而对于nack则是false。 对于nack实例,原因可能包含在nack的reason 字段中(如果在生成nack时可用)。 一个示例是将消息发送到不存在的exchange时。 在这种情况下,broker 将关闭channel。 reason 中包含了关闭的原因。 The cause已在1.4版中添加。

一个RabbitTemplate只能支持一个 ConfirmCallback 。

从2.1版开始,CorrelationData对象具有一个ListenableFuture,可用来获取结果,而不是在模板上使用ConfirmCallback。 以下示例显示如何配置CorrelationData实例:

CorrelationData cd1 = new CorrelationData();
this.templateWithConfirmsEnabled.convertAndSend("exchange", queue.getName(), "foo", cd1);
assertTrue(cd1.getFuture().get(10, TimeUnit.SECONDS).isAck());

由于它是ListenableFuture ,因此可以在准备就绪时用get()方法获取结果,也可以为异步回调添加侦听器。 Confirm对象是一个简单的bean,具有2个属性:ack和reason(对于nack实例)。broker生成的nack实例未填充reason 字段。 由框架生成的nack实例则包含reason 字段(例如,在ack实例未完成时关闭连接)。
另外,当同时启用确认和返回( confirms and returns)时,将使用返回的消息填充CorrelationData。 在使用ack设置CorrelationData的future之前发生确认和返回是被承诺的。