Wednesday, February 08, 2012

ActiveMQ Recommendations (3) Server Configuration

Server resource
ActiveMQ use the following parameter to configure system resource: 
<systemUsage>
      <systemUsage>
        <memoryUsage>
          <memoryUsage limit="256 mb"/>
        </memoryUsage>
        <storeUsage>
          <storeUsage limit="10 gb" />
        </storeUsage>
        <tempUsage>
          <tempUsage limit="1 gb"/>
        </tempUsage>
      </systemUsage>
</systemUsage>

The memory is used for non-persistent messages, storeUsage is for persistent messages, and the tempUsage is used to store non-persistent messages when memory heap reaches the limit.
 JMS features Configuration
1. nio protocol
The NIO (new I/O) protocol is introduced to Java as an alternative approach to network programming by access low-level I/O operations of OS. And it allows the application to use the same resources to handle more network clients and heavier loads on JMS servers.
It’s supported by ActiveMQ and can be used to improve the broker performance, compared with tcp protocol.
In order to configure nio protocol,
a. From the broker side, we need to modify the transport connector and network connect uri by replacing “tcp” with “nio”.
b. From the broker clients, biztalk or outm servers, the JMS provider should also be configured with nio url.

2. Dynamic message forwarding
Dynamic message forwarding can be used to avoid network overhead. By using “dynamicallyIncludedDestinations”, messages will only be forwarded to remote brokers when there are active consumers.
The configuration can be done as follows:
      <networkConnector name="outm-nc-general" 
        uri="static:(tcp://&lt;host>:<port>,tcp://<host>:<port>)"
         conduitSubscriptions="false"
         decreaseNetworkConsumerPriority="false"
     duplex="false">
            <excludedDestinations>
              <queue physicalName=">"/>
          </excludedDestinations>
           <dynamicallyIncludedDestinations>
              <queue physicalName="YELEI.TEST"/> 
          </ dynamicallyIncludedDestinations >
      </networkConnector>
    </networkConnectors>

3. Destination policy
a. Message expiration
Normally messages would expire if they stay in the queue for a certain period so that the broker/queue performance would not be impact. In ActiveMQ, by default, messages would expire in 30 seconds, meaning, they would be moved to dead letter queue after 30 seconds if they are persistent. (In the current 5.5.1 version, the behavior varies depending on whether message consumers are present for the queue. If so, messages that are not consumed in time go to the DLQ. Otherwise, messages stays in the broker storge if no consumers are connected to the queue) For non-persistent messages, they would be discarded after expiration. It can also be configured to move non-persistent messages to DLQ after expiration.
By default, all the expired messages go to the queue called ActiveMQ.DLQ. However, it’s not convenient when there are multiple queues to manage. Normally some dead letter queues are specified for individual queues. For example:
<destinationPolicy>
    <policyMap>
      <policyEntries>
        <policyEntry queue="YELEI.TEST" expireMessagesPeriod =”60000>
          <deadLetterStrategy>
               <individualDeadLetterStrategy
              queuePrefix="DLQ." useQueueForQueueMessages="true" processNonPersistent=”true/>
          </deadLetterStrategy>
        </policyEntry>
      </policyEntries>
    </policyMap>
</destinationPolicy>

In the above example, processNonPersistent enables the storing of expired non-persistent messages in the DLQ. This property is also available for sharedDeadLetterStrategy. expireMessagesPeriod sets the expiration time as 60 seconds. If the value is set to 0, it means messages will never expire.

b. Producer flow control
Enabling producer flow control would help when broker reaches resource limit such as memory or storage. It would diligently keep producers waiting until the broker has resource to continue processing inbound messages.
It can be configured as follows:
<destinationPolicy>
      <policyMap>
        <policyEntries>
          <policyEntry queue="YELEI.TEST" producerFlowControl="true" memoryLimit="10mb">
            ……
          </policyEntry>
        </policyEntries>
      </policyMap>
</destinationPolicy>

In system usage configuration, it can be configured to send out a failure after a certain amount of time:
<systemUsage>
 <systemUsage sendFailIfNoSpaceAfterTimeout="3000">
   <memoryUsage>
     <memoryUsage limit="20 mb"/>
                 ……

c. Message threading
By default, in a scenario where a message is sent by producer to ActiveMQ, and then delivered to the consumer, there are 5 threads being involved sequentially: producer session thread, activemq inbound transportation thread, activemq dispatch thread, outbound transportation thread, consumer session thread.
The product design of ActiveMQ tries to use multiple threading and asynchronous communication as much as possible to maximize the performance.
If we set the connection to the broker to be synchronous by using “tcp://jmshost?async=false”, broker will disable the default asynchronous communication, so that only 3 thread are involved, producer session thread, activemq dispatch thread and consumer thread. In such a scenario, the activemq inbound transportation thread will not be created because the producer controls the transportation and it’ll wait for an acknowledgement when the transportation succeeds. The dispatch thread will also wait until the message is sent to the consumer.
From the consumer perspective, an option to streamline the processing of large amount of small messages is to use optimizedDispatch option. This option disables the creation of the session thread inside the consumer, and the transportation thread delivers messages directly to message listener. The configuration can be done as follows:
   <destinationPolicy>
      <policyMap>
        <policyEntries>
          <policyEntry queue="YELEI.TEST" optimizedDispatch =”true>
            ……
          </policyEntry>
        </policyEntries>
      </policyMap>
   </destinationPolicy>

d. Message cursors
In a normal message produce-consume scenario, after a broker receives a message, the message is first persisted (by default), and gets sent to dispatch queue in memory, ready for the consumer clients to pick up.
However, when consumers are slower than producers, ActiveMQ would add an additional message cursor as the buffer between persistence storage and dispatch queue. This is the default scenario, called store-based cursor.
There are 2 alternatives for this: vmQueueCursor and fileQueueCursor.
Instead of pointing to messages in the message store, vmQueueCursor point to messages in memory. This can be efficient but it is not suitable for very slow consumers or temporarily-inactive consumers.
As a comparison, fileQueueCursor refers to messages stored in temporary files. It can be more efficient than the store-based cursor when the message stores are not performing due to huge amount of messages. The same mechanism as fileQueueCursor is used to store non-persistent messages when memory limit is reached.
The configuration is done per destination/queue. An example is as follows:
<destinationPolicy>
      <policyMap>
        <policyEntries>
          <policyEntry queue="YELEI.>">
            <pendingQueuePolicy>
                <vmQueueCursor />
                <!-- fileQueueCursor-->
            </pendingQueuePolicy>
          </policyEntry>
        </policyEntries>
      </policyMap>
 </destinationPolicy>

e. Message Pending
ActiveMQ internally streams messages for consumers in memory of consumers to increase message processing performance. By default, the prefetch size for persistent queue and non-persistent queue are both 1000. In addition, inside the broker, messages can be stored in memory as pending messages in case that consumers reach prefetch size.
Once the number of pending messages reaches the limit and new messages still come in, old messages will be discarded. If the value is set to “-1”, the messages will never be discarded.
The number of messages kept in the broker for each queue can be configured as follows:
    <destinationPolicy>
      <policyMap>
        <policyEntries>
          <policyEntry queue="YELEI.>">
            <pendingMessageLimitStrategy>
              <constantPendingMessageLimitStrategy limit="100"/>
            </pendingMessageLimitStrategy>
          </policyEntry>
        </policyEntries>
      </policyMap>
      </destinationPolicy>

 An alternative to constantPendingMessageLimitStrategy is prefetchRatePendingMessageLimitStrategy, which specifies the multiplier of the prefetch size from consumers.
g. Total ordering
Total ordering addresses the same issue as exclusive consumer and guarantees the order of message processing. The difference is that multiple consumers can exist for the queue. The cost compared to exclusive consumer is the overhead to synchronize the information between consumer clients.
It can be done by means of destination policy:
<destinationPolicy>
      <policyMap>
        <policyEntries>
          <policyEntry queue="YELEI.>">
            <dispatchPolicy>
              <strictOrderDispatchPolicy />
            </dispatchPolicy>
          </policyEntry>
        </policyEntries>
      </policyMap>
</destinationPolicy>

4. Large XML processing
In ActiveMQ, BLOB message can be used to deliver large xml files. However, this feature might not be supported by out-of-the-box functionality of the queue clients and customization is required. It's recommended to use a web service on the ActiveMQ server as the large message producer, because web service is supported by most platforms in the market, and the parameters to invoke the web service are only information about queue name and file url, which doesn’t cause much network overhead. The jetty server bundled with ActiveMQ also supports web service implementations like this.
a. Large message producer:
 i. Define a wsdl file that contains the interface to enqueue big file, the parameter should contain the file url and the destination queue name.
 ii. Implement the wsdl interface with a java application, and expose it as a web service.
 iii. Deploy the web service to the ActiveMQ server.
b. Large message consumer:
Implementation varies depending on which platform is used for development.

2 comments:

Unknown said...

Good info. If I want to use default store base cursor, what is the configuration in the activemq.xml file? Do I just skip it and set PFC to false? Thanks.

Nazar said...

"expireMessagesPeriod" is the period (in ms) of checks for message expiry on queued messages, value of 0 disables. Proof link http://activemq.apache.org/per-destination-policies.html