本文档基于 spring AMQP 2.2.1RELEASE REFERENCE 文档
尽管我们之前描述的AMQP模型是通用的,并且适用于所有实现,但是当我们进行资源管理时,详细信息都基于特定的 broker 实现。因此,在本节中,我们专注“ spring-rabbit”模块的实现,因为现在,RabbitMQ是唯一受支持的AMQP实现。
用于管理与RabbitMQ代理的连接的中央组件是ConnectionFactory接口。
ConnectionFactory实现的职责是提供**org.springframework.amqp.rabbit.connection.Connection**
的实例,该实例是**com.rabbitmq.client.Connection**
的包装。我们提供的唯一具体实现是 CachingConnectionFactory ,默认情况下,它将建立一个可以由应用程序共享的单个连接代理。由于与AMQP进行消息传递的“unit of work”实际上是一个“channel”(在某些方面,它类似于JMS中的连接和会话之间的关系),因此可以共享连接。连接实例提供了createChannel方法。 CachingConnectionFactory实现支持这些通道的缓存,并且根据通道是否是事务性的,为通道维护单独的缓存。创建CachingConnectionFactory实例时,可以通过构造函数提供 “hostname”(应该是指的spring提供的host属性)。您还应该提供 “username” 和 “password” 属性。要配置通道缓存的大小(默认值为25),可以调用 setChannelCacheSize 方法。
从1.3版开始,您可以配置CachingConnectionFactory来缓存连接或者仅缓存通道。在这种情况下,对createConnection的每次调用都会创建一个新的连接(或从缓存中检索一个空闲的连接)。关闭连接会将其返回到缓存(如果尚未达到缓存上限)。在此类连接上创建的通道也将被缓存。在某些环境中,使用单独(seperate,这里应该是说一个连接一个通道)的连接可能很有用,例如从一个HA群集消费、连接一个已经连接到不同的群集成员负载平衡器,等等。要缓存连接,请将cacheMode设置为CacheMode.CONNECTION。
重要的是要了解,缓存大小(默认情况下)不是对缓存的限制,而仅仅是说明可以缓存的channel数。缓存大小为10,实际上可以使用任何数量的channel。如果使用了10个以上的channel,则10个channel进入缓存,其余部分则关闭。
从版本1.6开始,默认的channel缓存大小已从1增加到25。在大容量,多线程环境中,较小的缓存意味着将以较高的速率创建和关闭通道。增加默认缓存大小可以避免这种创建缓存的开销。您应该通过RabbitMQ Admin UI监视正在使用的通道,如果看到许多正在创建和关闭的通道,请考虑进一步增加缓存大小。缓存仅按需增长(以适应应用程序的并发要求),因此此更改不会影响现有的小批量应用程序。
从1.4.2版开始,CachingConnectionFactory具有一个名为channelCheckoutTimeout的属性。当此属性大于零时,channelCacheSize成为可在连接上创建的通道数的限制。如果达到限制,则调用线程将阻塞,直到某个通道可用或达到此超时为止,在这种情况下,将抛出AmqpTimeoutException
以下示例显示了如何创建新连接:
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("somehost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = connectionFactory.createConnection();
使用XML时,配置如以下示例:
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="somehost"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
</bean>
可以使用Rabbit命名空间快速而方便地创建ConnectionFactory,如下所示:
<rabbit:connection-factory id="connectionFactory"/>
在大多数情况下,这种方法是可取的,因为框架可以为您选择最佳的默认值。创建的实例是CachingConnectionFactory。请记住,通道的默认缓存大小为25。如果要缓存更多的通道,请通过设置'channelCacheSize'属性设置更大的值。在XML中,如下所示:
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="somehost"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
<property name="channelCacheSize" value="50"/>
</bean>
同样,使用命名空间,您可以添加channel-cache-size属性,如下所示:
<rabbit:connection-factory
id="connectionFactory" channel-cache-size="50"/>
默认的缓存模式是CHANNEL,但是您可以将其配置为缓存连接。在以下示例中,我们使用connection-cache-size:
<rabbit:connection-factory
id="connectionFactory" cache-mode="CONNECTION" connection-cache-size="25"/>
您可以使用名称空间提供主机和端口属性,如下所示:
<rabbit:connection-factory
id="connectionFactory" host="somehost" port="5672"/>
或者,如果在群集环境中运行,则可以使用addresss属性,如下所示:
<rabbit:connection-factory
id="connectionFactory" addresses="host1:5672,host2:5672" shuffle-addresses="true"/>
以下示例注入了一个自定义线程工厂,该工厂在线程名称前添加了"Rabbitmq-"前缀:
<rabbit:connection-factory id="multiHost" virtual-host="/bar" addresses="host1:1234,host2,host3:4567"
thread-factory="tf"
channel-cache-size="10" username="user" password="password" />
<bean id="tf" class="org.springframework.scheduling.concurrent.CustomizableThreadFactory">
<constructor-arg value="rabbitmq-" />
</bean>
Naming Connection 命名连接
从1.7版本开始,提供了ConnectionNameStrategy属性用于注入到AbstractionConnectionFactory中。生成的名称用于特定应用程序对RabbitMQ连接的标识。如果RabbitMQ服务器支持,则连接名称将显示在管理UI中。此值不必唯一,也不能用作连接标识符(例如,在HTTP API请求中)。该值应该是人类可读的,并且是connection_name项下ClientProperty的一部分。您可以使用简单的Lambda,如下所示:
connectionFactory.setConnectionNameStrategy(connectionFactory -> "MY_CONNECTION");
ConnectionFactory 参数可用于通过某些逻辑来区分目标连接名称。 默认情况下,将使用AbstractConnectionFactory的beanName,代表对象的十六进制字符串以及内部计数器来生成connection_name。<rabbit:connection-factory>
命名空间组件也提供了connection-name-strategy属性。
SimplePropertyValueConnectionNameStrategy对于ConnectionNameStrategy的实现将连接名称设置为应用程序属性。 您可以将其声明为@Bean并将其注入连接工厂,如以下示例所示:
@Bean
public ConnectionNameStrategy cns() {
return new SimplePropertyValueConnectionNameStrategy("spring.application.name");
}
@Bean
public ConnectionFactory rabbitConnectionFactory(ConnectionNameStrategy cns) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
...
connectionFactory.setConnectionNameStrategy(cns);
return connectionFactory;
}
属性必须在应用上下文的Environment
中存在。
阻塞的连接和资源限制
连接可能被阻止与来自与内存警报相对应的broker进行交互。 从2.0版开始,org.springframework.amqp.rabbit.connection.Connection
可以与com.rabbitmq.client.BlockedListener
实例一起使用,以通知连接block和unblock的事件。 另外,AbstractConnectionFactory通过其内部的BlockedListener实现发出ConnectionBlockedEvent和ConnectionUnblockedEvent事件。 这些使您可以提供应用程序逻辑,以对broker上的问题做出适当反应,采取一些对应措施。
从2.0.2版开始,RabbitTemplate具有一个配置选项,可以在不使用事务的情况下,自动使用第二个连接工厂。 有关更多信息,请参见这里。 生产者连接的ConnectionNameStrategy与主策略相同,在调用方法的结果后附加.publisher。
从版本1.7.7开始,提供了AmqpResourceNotAvailableException,当SimpleConnection.createChannel()
无法创建Channel时(例如,由于达到了channelMax限制并且高速缓存中没有可用的通道),将抛出此异常。 您可以在RetryPolicy中使用此异常,以在某些回退之后恢复操作。
配置底层的客户端连接工厂
CachingConnectionFactory 使用 Rabbit Client 包 里面的ConnectionFactory实例。 在CachingConnectionFactory上设置的许多配置属性(例如,host,port,username,password,requestedHeartBeat、connectionTimeout)将会被传递到对应的等价属性。 要设置其他属性(例如,clientProperties),您可以定义Rabbit factory 的实例,并使用CachingConnectionFactory的适当构造函数为其提供引用。 如果使用名称空间(xml)的的时候,您需要在connection-factory属性中提供对已配置工厂的引用。 为了方便起见,提供了factory bean来简化Spring应用程序上下文中配置连接工厂。
<rabbit:connection-factory
id="connectionFactory" connection-factory="rabbitConnectionFactory"/>
RabbitConnectionFactoryBean 和配置 SSL
从版本1.4开始,提供了一个便捷的RabbitConnectionFactoryBean,该类使用依赖项注入在底层客户端连接工厂上实现SSL属性的便捷配置。 其他setter委托给底层的工厂。 以前,您必须以编程方式配置SSL选项。 以下示例显示了如何配置RabbitConnectionFactoryBean:
<rabbit:connection-factory id="rabbitConnectionFactory"
connection-factory="clientConnectionFactory"
host="${host}"
port="${port}"
virtual-host="${vhost}"
username="${username}" password="${password}" />
<bean id="clientConnectionFactory"
class="org.springframework.amqp.rabbit.connection.RabbitConnectionFactoryBean">
<property name="useSSL" value="true" />
<property name="sslPropertiesLocation" value="file:/secrets/rabbitSSL.properties"/>
</bean>
有关配置SSL的信息,省略keyStore和trustStore配置以通过SSL进行连接而无需证书验证的方法请参见RabbitMQ文档。下一个示例显示如何提供密钥和信任库配置。
sslPropertiesLocation属性是一个Spring Resource,指向包含以下配置的properites文件:
keyStore=file:/secret/keycert.p12
trustStore=file:/secret/trustStore
keyStore.passPhrase=secret
trustStore.passPhrase=secret
keyStore和truststore是指向Spring Resources指向的文件。 通常,此properties file由操作系统保护,而应用程序具有read 权限。
从Spring AMQP 1.5版开始,您可以直接在工厂bean上设置这些属性。 如果同时提供了工厂上设置的属性和sslPropertiesLocation,则后者中的属性将覆盖工厂上设置的值。
连接到集群
要连接到群集,需要在CachingConnectionFactory上配置address属性:
@Bean
public CachingConnectionFactory ccf() {
CachingConnectionFactory ccf = new CachingConnectionFactory();
ccf.setAddresses("host1:5672,host2:5672,host3:5672");
return ccf;
}
底层的connection factory将尝试依次顺序连接到每个主机。 从版本2.1.8开始,可以通过将shuffleAddresses属性设置为true来使用随机顺序连接。 随机连接应该在建立新连接之前设置。
@Bean
public CachingConnectionFactory ccf() {
CachingConnectionFactory ccf = new CachingConnectionFactory();
ccf.setAddresses("host1:5672,host2:5672,host3:5672");
ccf.setShuffleAddresses(true);
return ccf;
}
路由Connection Factory
从1.3版开始,引入了AbstractRoutingConnectionFactory。 该工厂提供了一种机制,可以为多个ConnectionFactory配置映射并在运行时通过一些lookupKey确定目标ConnectionFactory。 通常,实现检查线程绑定上下文。 为了方便起见,Spring AMQP提供了SimpleRoutingConnectionFactory,该工厂从SimpleResourceHolder获取当前线程绑定的lookupKey。 以下示例说明如何在XML和Java中配置SimpleRoutingConnectionFactory:
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory">
<property name="targetConnectionFactories">
<map>
<entry key="#{connectionFactory1.virtualHost}" ref="connectionFactory1"/>
<entry key="#{connectionFactory2.virtualHost}" ref="connectionFactory2"/>
</map>
</property>
</bean>
<rabbit:template id="template" connection-factory="connectionFactory" />
public class MyService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void service(String vHost, String payload) {
SimpleResourceHolder.bind(rabbitTemplate.getConnectionFactory(), vHost);
rabbitTemplate.convertAndSend(payload);
SimpleResourceHolder.unbind(rabbitTemplate.getConnectionFactory());
}
}
资源使用之后解绑是非常重要的,更多信息,可以参见AbstractRoutingConnectionFactory
从1.4版本开始,RabbitTemplate支持SpEL sendConnectionFactorySelectorExpression和receiveConnectionFactorySelectorExpression属性,这些属性在每个AMQP协议交互操作(send, sendAndReceive, receive, receiveAndReply)上进行解析,最终解析为供AbstractRoutingConnectionFactory使用的lookupKey值。 您可以在表达式中使用bean引用,例如@ vHostResolver.getVHost(#root)
。 对于发送操作,要发送的消息是root解析的对象。 对于接收操作,queueName是root解析的对象。
路由算法如下:如果选择表达式为null或被解析为null或提供的ConnectionFactory不是AbstractRoutingConnectionFactory的实例,则所有操作均依赖已有的ConnectionFactory实现。 如果评估结果不为null,但没有适用于该lookupKey的目标ConnectionFactory,并且使用lenientFallback = true
配置AbstractRoutingConnectionFactory,则会发生和先前相同的情况。 在使用AbstractRoutingConnectionFactory的情况下,它会回退到基于defineCurrentLookupKey() 的路由实现。 但是,如果lenientFallback = false
,则抛出IllegalStateException。
命名空间(xml)还提供了<rabbit:template>
组件上的send-connection-factory-selector-expression和receive-connection-factory-selector-expression属性来支持路由key。
另外,从1.4版开始,您可以在listener container中配置routing connection factory。 在这种情况下,队列名称列表将用作查找关键字。 例如,如果使用setQueueNames("thing1","thing2")
配置容器,则查找键为[thing1,thing]
(请注意,键中没有空格)。
从1.6.9版开始,可以通过在 listener container上使用setLookupKeyQualifier将qualifier 添加到查找键(lookup key)。 这样就可以侦听具有相同名称但在不同virtual host中的队列(每个虚拟主机中都有一个connection factory)。
例如,在使用lookup key qualifier thing1和一个container 侦听队列thing2的情况下,可以注册目标连接工厂的查找键为thing1[thing2]。
队列亲和性和 LocalizedQueueConnectionFactory
在群集中使用HA队列时,为了获得最佳性能,您可能需要连接到mater队列所在的物理broker。可以使用多个代理地址配置CachingConnectionFactory。这种情况下客户端尝试按顺序连接来进行故障转移。LocalizedQueueConnectionFactory使用管理插件提供的REST API来确定在哪个节点上是master队列。 然后,它创建(或从缓存检索)仅连接到该节点的CachingConnectionFactory。 如果连接失败,那么将确定新的master节点,并且消费者将连接到新的master节点。 如果无法确定队列的物理位置,则使用默认的连接工厂配置LocalizedQueueConnectionFactory,在这种情况下,它将照常连接到群集。
LocalizedQueueConnectionFactory是一个RoutingConnectionFactory,SimpleMessageListenerContainer使用队列名称作为查找关键字,如上面的 "路由 Connection Factory" 中所述。
以下示例配置显示了如何使用Spring Boot的RabbitProperty配置工厂:
@Autowired
private RabbitProperties props;
private final String[] adminUris = { "https://host1:15672", "https://host2:15672" };
private final String[] nodes = { "rabbit@host1", "rabbit@host2" };
@Bean
public ConnectionFactory defaultConnectionFactory() {
CachingConnectionFactory cf = new CachingConnectionFactory();
cf.setAddresses(this.props.getAddresses());
cf.setUsername(this.props.getUsername());
cf.setPassword(this.props.getPassword());
cf.setVirtualHost(this.props.getVirtualHost());
return cf;
}
@Bean
public ConnectionFactory queueAffinityCF(
@Qualifier("defaultConnectionFactory") ConnectionFactory defaultCF) {
return new LocalizedQueueConnectionFactory(defaultCF,
StringUtils.commaDelimitedListToStringArray(this.props.getAddresses()),
this.adminUris, this.nodes,
this.props.getVirtualHost(), this.props.getUsername(), this.props.getPassword(),
false, null);
}
请注意,前三个参数addresses,adminUris和nodes都是数组。 这些是适当的,因为当容器尝试连接到队列时,它确定队列在哪个node上是master队列,并连接到同一数组位置对应的的地址。
Publisher Confirms and Returns
通过将CachingConnectionFactory的PublisherConfirmType属性设置为ConfirmType.CORRELATED
并将PublisherReturns属性设置为'true'
,可以开启发布消息的确认(具有相关性)和返回消息。
设置这些选项后,由工厂创建的Channel实例将被包装在PublisherCallbackChannel中,该Publisher用于增加回调。当获得这样的channel时,客户端可以向该channel注册PublisherCallbackChannel.Listener。 PublisherCallbackChannel实现包含将确认或返回到适当的listener的逻辑。
Connection and Channel Listeners
连接工厂支持注册ConnectionListener和ChannelListener实现。 这使您可以接收有关Connection和Channel相关事件的通知。(当建立Connection时,RabbitAdmin使用ConnectionListener来执行声明(declarations)-有关更多信息,请参见这里)。 以下代码显示了ConnectionListener接口定义:
@FunctionalInterface
public interface ConnectionListener {
void onCreate(Connection connection);
default void onClose(Connection connection) {
}
default void onShutDown(ShutdownSignalException signal) {
}
}
从2.0版开始,org.springframework.amqp.rabbit.connection.Connection
对象可以支持与com.rabbitmq.client.BlockedListener
实例一起使用,使connection被block和unblock的事件进行通知。 以下示例显示ChannelListener接口定义:
@FunctionalInterface
public interface ChannelListener {
void onCreate(Channel channel, boolean transactional);
default void onShutDown(ShutdownSignalException signal) {
}
}
记录通道关闭事件(Logging Channel Close Events)
1.5版引入了一种机制,使用户可以控制日志记录级别。
CachingConnectionFactory使用默认策略来记录通道关闭,如下所示:
- 正常通道关闭(200 OK)不会被记录。
- 如果通道由于被动队列(passive queue)声明失败而关闭,那么它将在调试级别记录。
- 如果由于消费者排他使
basic.consume
被拒绝进而导致的通道关闭,那么它将以INFO级别记录。 - 其他所有日志均以ERROR级别记录。
若要修改此行为,可以在CachingConnectionFactory的closeExceptionLogger属性中注入自定义的ConditionalExceptionLogger。
运行时缓存属性(Runtime Cache Properties)
从1.6版开始,CachingConnectionFactory现在通过getCacheProperties方法提供缓存统计信息。 这些统计信息可用于调整缓存以在生产中对其进行优化。 例如,the high water marks可用于确定是否应增加缓存大小。 如果等于缓存大小,则可能要考虑进一步增加。 下表描述了CacheMode.CHANNEL
属性:
Table 1. Cache properties for CacheMode.CHANNEL
属性 | 意义 |
---|---|
connectionName | 由ConnectionNameStrategy生成的连接的名称。 |
channelCacheSize | 当前配置的允许空闲的最大通道数。 |
localPort | 连接的本地端口(如果有)。这通常与rabbitMQ Admin UI上的连接和通道是相关的。 |
idleChannelsTx | 当前空闲(缓存)的事务通道的数量。 |
idleChannelsNotTx | 当前空闲(缓存)的非事务通道的数量。 |
idleChannelsTxHighWater | 同时空闲(缓存)的最大事务通道数。 |
idleChannelsNotTxHighWater | 同时空闲(缓存)的非事务通道的最大数量。 |
接下来的表格描述了CacheMode.CONNECTION
属性:
Table 2. Cache properties for CacheMode.CONNECTION
属性 | 意义 |
---|---|
connectionName: | 由ConnectionNameStrategy 生成的名字. |
openConnections | 表示与broker连接的连接对象的数量。 |
channelCacheSize | 当前配置的允许空闲的最大通道数。 |
connectionCacheSize | 当前配置的允许空闲的最大连接数。 |
idleConnections | 当前空闲的连接数。 |
idleConnectionsHighWater | 同时空闲的最大连接数。 |
idleChannelsTx: | 连接当前空闲(缓存)的事务通道的数量。 属性名称的localPort部分与RabbitMQ Admin UI上的连接和通道对应。 |
idleChannelsNotTx: | 连接当前空闲(缓存)的非事务通道的数量。 属性名称的localPort部分可用于与RabbitMQ Admin UI上的连接和通道对应。 |
idleChannelsTxHighWater: | 同时空闲(缓存)的最大事务通道数。 属性名称的localPort部分可用于与RabbitMQ Admin UI上的连接和通道关联。 |
idleChannelsNotTxHighWater: | 同时空闲(缓存)的非事务通道的最大数量。 属性名称的localPort部分可用于与RabbitMQ Admin UI上的连接和通道关联。 |
cacheMode
属性 (CHANNEL
or CONNECTION
) 也被包含在返回对象内.
Figure 1. JVisualVM 示例
RabbitMQ 自动 Connection/Topology 恢复
Since the first version of Spring AMQP, the framework has provided its own connection and channel recovery in the event of a broker failure. Also, as discussed in Configuring the Broker, the RabbitAdmin
re-declares any infrastructure beans (queues and others) when the connection is re-established. It therefore does not rely on the auto-recovery that is now provided by the amqp-client
library. Spring AMQP now uses the 4.0.x
version of amqp-client
, which has auto recovery enabled by default. Spring AMQP can still use its own recovery mechanisms if you wish, disabling it in the client, (by setting the automaticRecoveryEnabled
property on the underlying RabbitMQ connectionFactory
to false
). However, the framework is completely compatible with auto-recovery being enabled. This means any consumers you create within your code (perhaps via RabbitTemplate.execute()
) can be recovered automatically.
从Spring AMQP的第一个版本开始,该框架在代理发生故障的情况下提供了自己的连接和通道恢复。 另外,如 Configuring the Broker中所述,当重新建立连接时,RabbitAdmin会重新声明所有底层的Bean(队列和其他)。 因此,它不依赖于amqp-client库现在提供的自动恢复。 Spring AMQP现在使用amqp-client的4.0.x版本,该库默认情况下启用了auto-recovery。 如果需要,Spring AMQP仍可以使用其自己的恢复机制,只需在amqp-client客户端将其禁用(通过将基础RabbitMQ connectionFactory的automaticRecoveryEnabled属性设置为false)。 但是,该框架与启用的自动恢复完全兼容。 这意味着您在代码中创建的所有消费者(可能通过RabbitTemplate.execute())都可以自动恢复。