Monday, February 13, 2012

Advanced Features of Messaging Products I

Introduction
Enterprise messaging software came into the market in late 1980s. With the evolving of Java / “Java Messaging Service” (JMS) and “Service Oriented Architecture” (SOA) platforms in 2000s, nowadays enterprise messaging is not only about communications between applications, but also it becomes a pattern or style for integration in enterprise application landscape. Today we call messaging software “Message Oriented Middleware” (MOM), which is defined as “software or hardware infrastructure supporting sending and receiving messages between distributed systems” [1].
The motivation for adopting MOM software in a lot of projects stems from the need for guaranteed message delivery, flexible routing and message transformation. Another fact is that sometimes quite a lot of useful features of these products are overlooked, although those features would bring brilliant ideas for integration. We call them “advanced” features in the sense that they are not directly related to core features (message routing, delivery) but they strengthen those core features. This article briefly explains some features of MOM products by using ActiveMQ as the example. Hopefully it ignites the sparkle for easy and innovative integration solutions in some projects.
MOM versus JMS
One major problem with MOM software in early phases was lack of standards, until Java EE gave the JMS specification, which gained popularity and support of most MOM products. As a result, people often consider or call MOM software “JMS products”. However, MOM and JMS are different concepts.
Most MOM products implements JMS (Microsoft MSMQ does not), and they can do much more than that. JMS specification focuses on JMS elements and message structure. As a contrast, a protocol called “Advanced Message Queuing Protocol”, AMQP, is emerging that defines features regarding “message orientation, queuing, routing (including point-to-point and publish-and-subscribe), reliability and security”. Some MOM vendors already claimed the support of AMQP, such as OpenAMQ, StormMQ and RabbitMQ.
The features described in this article are not part of standard JMS specification. However, they are supported by various MOM products; the terminology could be different, though. In this article, several features offered by ActiveMQ are selected to explain alternative solutions to address common business problems. ActiveMQ has much more features than those described here. To see the complete list, please check [3].
Case study: AccNet
An imaginary company called AccNet is introduced to help understand features explained here. AccNet is a telecom provider that offers internet connections and related services to customers across various countries. Applications from various business units such as order department, dispatch centers, billing department and credit department communicate with each other via ActiveMQ .
AccNet has the following functional requirements related to their order fulfillment and billing logic: 1. Order fulfillment probably involves engineer service or hardware dispatch service. AccNet has several service teams and dispatch centers within each country. Depending on the work load of each team, those service requests are processed and they need to be processed by only one of the service teams.
2. After orders are received and fulfilled, the order/account information should be sent to both billing department where customer bills are to be decided and credit department which is responsible for processing the actual payment from customers.
3. A complete order might consists of several order messages, for example, orderMsg1, orderMsg2, and so on. Those messages are produced sequentially and they need to be consumed in the same way because orderMsg2 cannot be processed before orderMsg1. This order of processing is guaranteed in a single-consumer environment. However, application producers and consumers normally are deployed in clustered environments. The correct order of message consumption, based on the order the messages are produced, is required in the whole clustered environment.
Solutions with Features Explained
The features to be explained fall into 4 categories: destination features, consumer features, dispatch features, and message features, as defined in [3]. This article is the first part of two SOA focus articles and it focuses on some destination features from ActiveMQ.
1. Destination Features
1.1 Composite destination
Composite destination is a virtual destination that represents a list of physical destinations. Utilizing this feature could effectively address the 2nd business requirement.
Suppose the billing department for AccNet Benelux has the queue called NL.Order.Billing; and the credit department has queues called NL.Order.Credit and BG.Order.Credit respectively. (These queues are not necessarily geographically distributed in different locations. If they are, features such as message forwarding should be used as well to address the issue.) A plain solution can be implemented from the queue producer side, as illustrated below:

By applying composite destination, the solution can be simplified.

From message producer perspective, it doesn’t take care of sending messages to different queues any more. The task of delivering messages to queues is shifted to message broker. It’s more efficient because the message is sent only once, which has less network impact and less resource impact in message producer side.
Another point worth mentioning is that the destinations are not necessarily queues. The composite destination can consist of one queue and one topic. In that case, the prefix indicating the destination type should be added. For example, a composite destination can be “queue://NL.Order.Credit, topic://NL.Order.Audit”.
1.2 Wildcard destination
Wildcard adds more flexibility when defining destinations. “*” is used to match any name string; “>” is used to match any following string and combination of strings. For example, the destination string “NL.Order.*” matches any queue names such as NL.Order.Credit, NL.Order.Billing, and etc. “BG.>” matches any queue names that starts with “BG.”, such as “BG.Order.Credit”, “BG.Customer”, and so on.
1.3 Virtual topic
Virtual topic feature offers an alternative to composite destination. It’s quite useful when you want to scale up the number of consumers. It’s the solution from message consumer perspective.
Requirement 2 needs the same message being delivered to multiple applications/message clients. Instead of using the “queue” concept as described in [1.1 Composite destination] and [1.2 Wildcard destination], we can use the “topic” concept to meet the requirement.
Suppose the order messages are sent to a topic called “NL.Order”; both applications from credit department and billing department subscribe to this topic. However, the downside of the plain publish-subscribe solution is:
• Normally subscriber is not durable, which means there could be message loss. This is not acceptable by the business.
• If we make subscribers durable, the solution is not scalable, because only one consumer can be active for one subscriber and one ClientId.

By using virtual topic, we can have multiple consumers for each application, which is usually required by businesses. Here’s the illustration of the solution:

This solution look a bit complex, however in practice to achieve it is quite easy. Only 2 steps are required:
• Configure NL.Order is as a virtual topic, not a normal JMS topic. By default, virtual topics has “VirtualTopic” in the name (The name is customizable).
• The message clients are actually queue consumers, instead of normal topic subscribers. The queues they consume are created on the fly if clients are built to pick up messages from queue with names that end with “.VirtualTopic.NL.Order”.
With this solution, the 2 drawbacks of native JMS topic are overcome. Queue consumers are durable and by nature they compete for messages that are in the queue. So this is a scalable solution with load-balancing capability.
1.4 Message forwarding
Message forwarding feature effectively addresses the 1st requirement, which demands a load-balancing mechanism among remote consumers. In reality, it can be implemented in multiple ways, such as master-slave configuration, customized routing mechanism, queue bridges and so on. Here’s the illustration of how ActiveMQ resolves it:

The challenging part of the requirement is that the broker for the order queue is probably not in the same network or same geographical location as brokers for the service teams, while the messages from the order queue should be delivered to remote applications in a load-balancing manner.
To make this happen in ActiveMQ is really simple, which is to configure message forwarding from the Order Fulfillment MessageBroker to a list of other brokers. Message forwarding can be configured for either the broker or some specific queues. After that, service request message consumers are able to consume messages from corresponding message brokers without the need to create the queue in remote brokers. Message consumption behaves the same as if those consumers pick up messages from the Order Fulfillment Broker, and load balancing is also achieved between 2 remote consumers.
Summary
This article focuses on destination features of MOM products, using ActiveMQ as an example. A follow-up article will further explain consumer features, dispatching features and message features and how they address business problems. A table listing similar features/terms from different MOM products will also be provided.
References
1. http://en.wikipedia.org/wiki/Message-oriented_middleware
2. http://en.wikipedia.org/wiki/AMQP
3. http://activemq.apache.org/features.html

Wednesday, February 08, 2012

ActiveMQ Recommendations (3) Server Configuration

Server resource
ActiveMQ use the following parameter to configure system resource: 
<systemUsage>
      <systemUsage>
        <memoryUsage>
          <memoryUsage limit="256 mb"/>
        </memoryUsage>
        <storeUsage>
          <storeUsage limit="10 gb" />
        </storeUsage>
        <tempUsage>
          <tempUsage limit="1 gb"/>
        </tempUsage>
      </systemUsage>
</systemUsage>

The memory is used for non-persistent messages, storeUsage is for persistent messages, and the tempUsage is used to store non-persistent messages when memory heap reaches the limit.
 JMS features Configuration
1. nio protocol
The NIO (new I/O) protocol is introduced to Java as an alternative approach to network programming by access low-level I/O operations of OS. And it allows the application to use the same resources to handle more network clients and heavier loads on JMS servers.
It’s supported by ActiveMQ and can be used to improve the broker performance, compared with tcp protocol.
In order to configure nio protocol,
a. From the broker side, we need to modify the transport connector and network connect uri by replacing “tcp” with “nio”.
b. From the broker clients, biztalk or outm servers, the JMS provider should also be configured with nio url.

2. Dynamic message forwarding
Dynamic message forwarding can be used to avoid network overhead. By using “dynamicallyIncludedDestinations”, messages will only be forwarded to remote brokers when there are active consumers.
The configuration can be done as follows:
      <networkConnector name="outm-nc-general" 
        uri="static:(tcp://&lt;host>:<port>,tcp://<host>:<port>)"
         conduitSubscriptions="false"
         decreaseNetworkConsumerPriority="false"
     duplex="false">
            <excludedDestinations>
              <queue physicalName=">"/>
          </excludedDestinations>
           <dynamicallyIncludedDestinations>
              <queue physicalName="YELEI.TEST"/> 
          </ dynamicallyIncludedDestinations >
      </networkConnector>
    </networkConnectors>

3. Destination policy
a. Message expiration
Normally messages would expire if they stay in the queue for a certain period so that the broker/queue performance would not be impact. In ActiveMQ, by default, messages would expire in 30 seconds, meaning, they would be moved to dead letter queue after 30 seconds if they are persistent. (In the current 5.5.1 version, the behavior varies depending on whether message consumers are present for the queue. If so, messages that are not consumed in time go to the DLQ. Otherwise, messages stays in the broker storge if no consumers are connected to the queue) For non-persistent messages, they would be discarded after expiration. It can also be configured to move non-persistent messages to DLQ after expiration.
By default, all the expired messages go to the queue called ActiveMQ.DLQ. However, it’s not convenient when there are multiple queues to manage. Normally some dead letter queues are specified for individual queues. For example:
<destinationPolicy>
    <policyMap>
      <policyEntries>
        <policyEntry queue="YELEI.TEST" expireMessagesPeriod =”60000>
          <deadLetterStrategy>
               <individualDeadLetterStrategy
              queuePrefix="DLQ." useQueueForQueueMessages="true" processNonPersistent=”true/>
          </deadLetterStrategy>
        </policyEntry>
      </policyEntries>
    </policyMap>
</destinationPolicy>

In the above example, processNonPersistent enables the storing of expired non-persistent messages in the DLQ. This property is also available for sharedDeadLetterStrategy. expireMessagesPeriod sets the expiration time as 60 seconds. If the value is set to 0, it means messages will never expire.

b. Producer flow control
Enabling producer flow control would help when broker reaches resource limit such as memory or storage. It would diligently keep producers waiting until the broker has resource to continue processing inbound messages.
It can be configured as follows:
<destinationPolicy>
      <policyMap>
        <policyEntries>
          <policyEntry queue="YELEI.TEST" producerFlowControl="true" memoryLimit="10mb">
            ……
          </policyEntry>
        </policyEntries>
      </policyMap>
</destinationPolicy>

In system usage configuration, it can be configured to send out a failure after a certain amount of time:
<systemUsage>
 <systemUsage sendFailIfNoSpaceAfterTimeout="3000">
   <memoryUsage>
     <memoryUsage limit="20 mb"/>
                 ……

c. Message threading
By default, in a scenario where a message is sent by producer to ActiveMQ, and then delivered to the consumer, there are 5 threads being involved sequentially: producer session thread, activemq inbound transportation thread, activemq dispatch thread, outbound transportation thread, consumer session thread.
The product design of ActiveMQ tries to use multiple threading and asynchronous communication as much as possible to maximize the performance.
If we set the connection to the broker to be synchronous by using “tcp://jmshost?async=false”, broker will disable the default asynchronous communication, so that only 3 thread are involved, producer session thread, activemq dispatch thread and consumer thread. In such a scenario, the activemq inbound transportation thread will not be created because the producer controls the transportation and it’ll wait for an acknowledgement when the transportation succeeds. The dispatch thread will also wait until the message is sent to the consumer.
From the consumer perspective, an option to streamline the processing of large amount of small messages is to use optimizedDispatch option. This option disables the creation of the session thread inside the consumer, and the transportation thread delivers messages directly to message listener. The configuration can be done as follows:
   <destinationPolicy>
      <policyMap>
        <policyEntries>
          <policyEntry queue="YELEI.TEST" optimizedDispatch =”true>
            ……
          </policyEntry>
        </policyEntries>
      </policyMap>
   </destinationPolicy>

d. Message cursors
In a normal message produce-consume scenario, after a broker receives a message, the message is first persisted (by default), and gets sent to dispatch queue in memory, ready for the consumer clients to pick up.
However, when consumers are slower than producers, ActiveMQ would add an additional message cursor as the buffer between persistence storage and dispatch queue. This is the default scenario, called store-based cursor.
There are 2 alternatives for this: vmQueueCursor and fileQueueCursor.
Instead of pointing to messages in the message store, vmQueueCursor point to messages in memory. This can be efficient but it is not suitable for very slow consumers or temporarily-inactive consumers.
As a comparison, fileQueueCursor refers to messages stored in temporary files. It can be more efficient than the store-based cursor when the message stores are not performing due to huge amount of messages. The same mechanism as fileQueueCursor is used to store non-persistent messages when memory limit is reached.
The configuration is done per destination/queue. An example is as follows:
<destinationPolicy>
      <policyMap>
        <policyEntries>
          <policyEntry queue="YELEI.>">
            <pendingQueuePolicy>
                <vmQueueCursor />
                <!-- fileQueueCursor-->
            </pendingQueuePolicy>
          </policyEntry>
        </policyEntries>
      </policyMap>
 </destinationPolicy>

e. Message Pending
ActiveMQ internally streams messages for consumers in memory of consumers to increase message processing performance. By default, the prefetch size for persistent queue and non-persistent queue are both 1000. In addition, inside the broker, messages can be stored in memory as pending messages in case that consumers reach prefetch size.
Once the number of pending messages reaches the limit and new messages still come in, old messages will be discarded. If the value is set to “-1”, the messages will never be discarded.
The number of messages kept in the broker for each queue can be configured as follows:
    <destinationPolicy>
      <policyMap>
        <policyEntries>
          <policyEntry queue="YELEI.>">
            <pendingMessageLimitStrategy>
              <constantPendingMessageLimitStrategy limit="100"/>
            </pendingMessageLimitStrategy>
          </policyEntry>
        </policyEntries>
      </policyMap>
      </destinationPolicy>

 An alternative to constantPendingMessageLimitStrategy is prefetchRatePendingMessageLimitStrategy, which specifies the multiplier of the prefetch size from consumers.
g. Total ordering
Total ordering addresses the same issue as exclusive consumer and guarantees the order of message processing. The difference is that multiple consumers can exist for the queue. The cost compared to exclusive consumer is the overhead to synchronize the information between consumer clients.
It can be done by means of destination policy:
<destinationPolicy>
      <policyMap>
        <policyEntries>
          <policyEntry queue="YELEI.>">
            <dispatchPolicy>
              <strictOrderDispatchPolicy />
            </dispatchPolicy>
          </policyEntry>
        </policyEntries>
      </policyMap>
</destinationPolicy>

4. Large XML processing
In ActiveMQ, BLOB message can be used to deliver large xml files. However, this feature might not be supported by out-of-the-box functionality of the queue clients and customization is required. It's recommended to use a web service on the ActiveMQ server as the large message producer, because web service is supported by most platforms in the market, and the parameters to invoke the web service are only information about queue name and file url, which doesn’t cause much network overhead. The jetty server bundled with ActiveMQ also supports web service implementations like this.
a. Large message producer:
 i. Define a wsdl file that contains the interface to enqueue big file, the parameter should contain the file url and the destination queue name.
 ii. Implement the wsdl interface with a java application, and expose it as a web service.
 iii. Deploy the web service to the ActiveMQ server.
b. Large message consumer:
Implementation varies depending on which platform is used for development.

ActiveMQ Recommendations (2) Consumer Configuration

Exclusive consumer

1.     In a multiple-consumer scenario, messages are processed concurrently and the sequential processing cannot be guaranteed. Exclusive consumer can be used to guarantee the order of message processing.
2.     The exclusive consumer for ActiveMQ can be configured by using the queue parameter when creating connections with the broker.
Instead of using the queue name such as “YELEI.TEST”, “YELEI.TEST?consumer.exclusive=true” should be used.

Message Prefetch

1.       In order maximize the performance of message consumers, by default ActiveMQ pushes a certain amount of messages to consumers via queue connections. By default, the prefetch size is 1000 for persistent messages and non-persistent messages. In case of slow consumers processing an amount of big files, probably the prefetch size should be decreased.
Please note in a multiple-consumer scenario, the messages being prefetched by one consumer will not be available for other consumers unless the consumer is considered inactive. This feature would probably harm the sequential processing of messages.
2.       The prefetch size is configured per queue or per connection.
Example: “tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=100” or "
YELEI.TEST?consumer.prefetchSize=100"

ActiveMQ recommendations (1), Producer configuration

Control message persistence
Maintain messages in broker memory instead of persisting them in broker storage significantly improves broker performance.
There’s a small risk without message persistence that when the broker crashes, messages would get lost. However, this option is still widely used in the following scenarios:
a. If messages are not critical.
b. If messages can be easily recovered from producer side. It should be avoided when messages are critical and not easy to reproduce.
Besides, it should also be avoided when broker performance is not the bottle neck. In many cases, performance bottle neck usually lies in message consumers or producers.

It can be set from the producer side when connecting to the broker queue, as a connection parameter. Normally it’s done per queue. It can also be done on a broker level, but that’s not recommended.

Control message acknowledge mode
Every enqueue option has an acknowledge mode. By default, it’s always “auto”, which means producers always wait for the response of the broker. The auto_ack guarantees the “once-and-only-once” delivery.
An alternative is “dup_ok” mode, using which producers will resend the message when the broker acknowledgement is not delivered in time or get lost. This could happen when huge amount of messages are processed by the broker. The dup_ok_ack mode guarantees “at-least-once” delivery. It’s used when there’s the need to quickly free up system resources on producer side and there’s logic built in the broker or consumer side to ignore duplicate messages.
It can be set from the producer side when creating connections to the broker queue.

Control message communication pattern
By default, ActiveMQ uses asynchronous message delivery. However, when the client has specific setting regarding synchronous message delivery, the async way of communication would be overridden. So it’s wise to check the settings on biztalk and outm side. The communication pattern is set per broker connection. Example: "tcp://locahost:61616?jms.useAsyncSend=true"