ActiveMQ in Action (2)

billmingchen 2008-02-25

2.2 Transport    ActiveMQ目前支持的transport有:VM Transport、TCP Transport、SSL Transport、Peer Transport、UDP Transport、Multicast Transport、HTTP and HTTPS Transport、Failover Transport、Fanout Transport、Discovery Transport、ZeroConf Transport等。以下简单介绍其中的几种,更多请参考Apache官方文档。

2.2.1 VM Transport

VMtransport允许在VM内部通信,从而避免了网络传输的开销。这时候采用的连接不是socket连接,而是直接地方法调用。第一个创建VM连接的客户会启动一个embedVMbroker,接下来所有使用相同的brokername的VM连接都会使用这个broker。当这个broker上所有的连接都关闭的时候,这个broker也会自动关闭。

    以下是配置语法:

   vm://brokerName?transportOptions

   例如:vm://broker1?marshal=false&broker.persistent=false

   Transport Options的可选值如下:

Option NameDefault ValueDescription
MarshalfalseIf true, forces each command sent over the transport to be marshlled and unmarshlled using a WireFormat
wireFormatdefaultThe name of the WireFormat to use
wireFormat.*All the properties with this prefix are used to configure the wireFormat
createtrueIf the broker should be created on demand if it does not allready exist. Only supported in ActiveMQ 4.1
broker.*All the properties with this prefix are used to configure the broker. See Configuring Wire Formats for more information

   以下是高级配置语法:

   vm:(broker:(tcp://localhost)?brokerOptions)?transportOptions

   vm:broker:(tcp://localhost)?brokerOptions

    例如:vm:(broker:(tcp://localhost:6000)?persistent=false)?marshal=false

    Transport Options的可选值如下:

Option NameDefault ValueDescription
marshalfalseIf true, forces each command sent over the transport to be marshlled and unmarshlled using a WireFormat
wireFormatdefaultThe name of the WireFormat to use
wireFormat.*All the properties with this prefix are used to configure the wireFormat

   使用配置文件的配置语法:  

vm://localhost?brokerConfig=xbean:activemq.xml

    例如:vm:// localhost?brokerConfig=xbean:com/test/activemq.xml

   使用Spring的配置:

<bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean">
  <property name="config" value="classpath:org/apache/activemq/xbean/activemq.xml" />
  <property name="start" value="true" />
</bean>

<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" depends-on="broker">
  <property name="brokerURL" value="vm://localhost"/>
</bean>

   如果persistent是true,那么ActiveMQ会在当前目录下创建一个缺省值是activemq-data的目录用于持久化保存数据。需要注意的是,如果程序中启动了多个不同名字的VM broker,那么可能会有如下警告:Failed to start jmx connector: Cannot bind to URL [rmi://localhost:1099/jmxrmi]: javax.naming.NameAlreadyBoundException…可以通过在transportOptions中追加broker.useJmx=false来禁用JMX来避免这个警告。

2.2.2 TCP Transport

TCPtransport允许客户端通过TCPsocket连接到远程的broker。以下是配置语法:

tcp://hostname:port?transportOptions

    Transport Options的可选值如下:
Option NameDefault ValueDescription
minmumWireFormatVersion0The minimum version wireformat that is allowed
tracefalseCauses all commands that are sent over the transport to be logged
useLocalHosttrueWhen true, it causes the local machines name to resolve to "localhost".
socketBufferSize64 * 1024Sets the socket buffer size in bytes
soTimeout0sets the socket timeout in milliseconds
connectionTimeout30000A non-zero value specifies the connection timeout in milliseconds. A zero value means wait forever for the connection to be established. Negative values are ignored.
wireFormatdefaultThe name of the WireFormat to use
wireFormat.*All the properties with this prefix are used to configure the wireFormat. See Configuring Wire Formats for more information

   例如:tcp://localhost:61616?trace=false

2.2.3 Failover Transport

FailoverTransport是一种重新连接的机制,它工作于其它transport的上层,用于建立可靠的传输。它的配置语法允许制定任意多个复合的URI。Failovertransport会自动选择其中的一个URI来尝试建立连接。如果没有成功,那么会选择一个其它的URI来建立一个新的连接。以下是配置语法:

failover:(uri1,...,uriN)?transportOptions

failover:uri1,...,uriN

    Transport Options的可选值如下:
Option NameDefault ValueDescription
initialReconnectDelay10How long to wait before the first reconnect attempt (in ms)
maxReconnectDelay30000The maximum amount of time we ever wait between reconnect attempts (in ms)
useExponentialBackOfftrueShould an exponential backoff be used between reconnect attempts
backOffMultiplier2The exponent used in the exponential backoff attempts
maxReconnectAttempts0If not 0, then this is the maximum number of reconnect attempts before an error is sent back to the client
randomizetrueuse a random algorithm to choose the URI to use for reconnect from the list provided
backupfalseinitialize and hold a second transport connection - to enable fast failover

   例如:failover:(tcp://localhost:61616,tcp://remotehost:61616)?initialReconnectDelay=100

2.2.4 Discovery transport

Discoverytransport是可靠的tranport。它使用Discoverytransport来定位用来连接的URI列表。以下是配置语法:

discovery:(discoveryAgentURI)?transportOptions

discovery:discoveryAgentURI

    Transport Options的可选值如下:
Option NameDefault ValueDescription
initialReconnectDelay10How long to wait before the first reconnect attempt
maxReconnectDelay30000The maximum amount of time we ever wait between reconnect attempts
useExponentialBackOfftrueShould an exponential backoff be used btween reconnect attempts
backOffMultiplier2The exponent used in the exponential backoff attempts
maxReconnectAttempts0If not 0, then this is the maximum number of reconnect attempts before an error is sent back to the client

   例如:discovery:(multicast://default)?initialReconnectDelay=100       为了使用Discovery来发现broker,需要为broker启用discovery agent。 以下是XML配置文件中的一个例子:

<broker name="foo">
   <transportConnectors>
      <transportConnector uri="tcp://localhost:0" discoveryUri="multicast://default"/>
    </transportConnectors>
    ...
</broker>

   在使用Failover Transport或Discovery transport等能够自动重连的transport的时候,需要注意的是:设想有两个broker,它们都启用AMQ Message Store作为持久化存储,有一个producer和一个consumer连接到某个queue。当因其中一个broker失效时而切换到另一个broker的时候,如果失效的broker的queue中还有未被consumer消费的消息,那么这个queue里的消息仍然滞留在失效broker的中,直到失效的broker被修复并重新切换回这个被修复的broker后,之前被保留的消息才会被consumer消费掉。如果被处理的消息有时序限制,那么应用程序就需要处理这个问题。另外也可以通过ActiveMQ集群来解决这个问题。

   在transport重连的时候,可以在connection上注册TransportListener来获得回调,例如:

(ActiveMQConnection)connection).addTransportListener(new TransportListener() {
    public void onCommand(Object cmd) {
    }

    public void onException(IOException exp) {
    }

    public void transportInterupted() {
        // The transport has suffered an interruption from which it hopes to recover.
    }

    public void transportResumed() {
        // The transport has resumed after an interruption.
    }
});

相关推荐

xinglun / 0评论 2020-06-14