Friday, April 18, 2014

Sample on using WSO2 MB as the JMS message broker with WSO2 CEP

This post is based on Sample 0002 - Simple pass-through with JMS which uses ActiveMQ as the message broker. This describes how to use WSO2 Message Broker(MB) instead of ActiveMQ in Sample 0002

In this sample,
  • An external producer generate events for the topic "AllStockQuotes" to MB. 
  • CEP receives events from MB by subscribing for "AllStockQuotes" topic. 
  • Then CEP extracts several fields from incoming events and publishes them back to MB under the topic "BasicStockQuotes".
  • An external consumer receives events by subscribing to "BasicStockQuotes" topic in MB.

1. Bring up WSO2 MB server by executing script in <MB_HOME>/bin.

2. Goto <CEP_HOME>/repository/conf and open file, then set the following property,
  • connectionfactory.TopicConnectionFactory=amqp://admin:admin@clientid/carbon?brokerlist='tcp://localhost:5672' 
    (5672 is the listening port of the MB)
Also add the following entry,
  • topic.BasicStockQuotes = BasicStockQuotes 
(BasicStockQuotes is the topic under which CEP publishes events)

3. Bring up WSO2 CEP server with the configurations of sample 0002 by executing following command line,

./ -sn 0002 -DportOffset=1 -Dqpid.dest_syntax=BURL 
By setting portOffset=1, we offset the ports used in CEP server by 1 to avoid port conflicts with MB. This is only required if both MB and CEP servers are running on the same node.  Also, we force the usage of Binding URL(BURL) address syntax by setting  qpid.dest_syntax system property to BURL.

4. Create an JMS input event adaptor to receive messages from WSO2 MB with following configurations.
5. Create an JMS Output event adaptor to publish messages to WSO2 MB with following configurations.

 6. Add a new event builder to stream org.wso2.sample.stock.quote.basic:1.0.0 as follows,

7. Add a new event formatter to stream  org.wso2.sample.stock.quote.basic:1.0.0 as follows,

8. Goto <CEP_HOME>/samples/consumers/jms/src/main/resources and open file, then set the following properties,
  • java.naming.factory.initial = org.wso2.andes.jndi.PropertiesFileInitialContextFactory
  • connectionfactory.ConnectionFactory=amqp://admin:admin@clientid/carbon?brokerlist='tcp://localhost:5672'

Now goto <CEP_HOME>/samples/consumers/jms and run 'ant topicConsumer -Dtopic=BasicStockQuotes' command. Now the consumer will subscribe to MB for messages on "BasicStockQuotes" topic.

9. Goto <CEP_HOME>/samples/producers/stock-quote/src/main/resources and open file, then set the properties mentioned in step 7 in this file also. Then execute ant command. Now the producer will start publishing events to MB.

In the consumer terminal you must be able to see the receiving stock quote events.