Thursday, June 9, 2016

[WSO2][ESB][JMS] Local Transactionality Sample

Hi all,

Let's consider about JMS Transport parameter [1] "jms.transport.transactionality" sample. In here I use local transactionality.

When it's comes to transactionality, it helps to establish reliability message transformation. We can use tranactions and roll back them when having any issue/error. Within WSO2 ESB, it supports main two type of transactios through JMS Transport. Those are local and jta.

Local transactions use basically for locally used wso2 ESB instances while jta using for distributed systems. JTA transactions may engage with distributed system having clusters of esb which adding data to database etc.

Lets take a simple example of local tranactions which will roll back transaction in a failure. Lets' begin...

Rrerequisite:

1. Message broker should be up and running (Ex: wso2 MB)

2. ESB should be configured with relevant message broker. Add "transport.Transactionality" parameter and "transport.jms.SessionTransacted" parameter.

Sample axis2.xml Configuration:

 <transportReceiver name="jms" class="org.apache.axis2.transport.jms.JMSListener">   
  <parameter name="myTopicConnectionFactory" locked="false">   
     <parameter name="java.naming.factory.initial" locked="false">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</parameter>   
     <parameter name="java.naming.provider.url" locked="false">repository/conf/jndi.properties</parameter>    
     <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">TopicConnectionFactory</parameter>
     <parameter name="transport.jms.ConnectionFactoryType" locked="false">topic</parameter>  
     <parameter name="transport.jms.Transactionality" locked="true">local</parameter>   
     <parameter name="transport.jms.SessionTransacted" locked="true">true</parameter>  
     <parameter>   

    <parameter name="myQueueConnectionFactory" locked="false">   
     <parameter name="java.naming.factory.initial" locked="false">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</parameter>   
     <parameter name="java.naming.provider.url" locked="false">repository/conf/jndi.properties</parameter>   
     <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter>   
     <parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter>   
     <parameter name="transport.jms.Transactionality" locked="true">local</parameter>   
     <parameter name="transport.jms.SessionTransacted" locked="true">true</parameter>   
    </parameter>       

    <parameter name="default" locked="false">   
     <parameter name="java.naming.factory.initial" locked="false">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</parameter>   
     <parameter name="java.naming.provider.url" locked="false">repository/conf/jndi.properties</parameter>   
     <<parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">TopicConnectionFactory</parameter>   
     <parameter name="transport.jms.ConnectionFactoryType" locked="false">topic</parameter>   
     <parameter name="transport.jms.Transactionality" locked="true">local</parameter>   
     <parameter name="transport.jms.SessionTransacted" locked="true">true</parameter>   
    </parameter>   
   </transportReceiver>   


3. ESB should be up and running.

4. Axis2 server should be started in port 9000

Sample:

Create a proxy with following configuration: (For proxy insequence and fault sequence, I created two seperate sequences )

 <proxy name="ErrorProxy" startOnLoad="true" transports="https http jms">  
     <target faultSequence="fault" inSequence="ErrorInSequence"/>  
     <parameter name="transport.jms.ContentType">  
       <rules>  
         <jmsProperty>contentType</jmsProperty>  
         <default>text/xml</default>  
       </rules>  
     </parameter>  
     <parameter name="transport.jms.ConnectionFactory">myQueueConnectionFactory</parameter>  
     <parameter name="transport.jms.DestinationType">queue</parameter>  
     <parameter name="transport.jms.SessionTransacted">true</parameter>  
     <parameter name="transport.jms.Destination">errorQueue</parameter>  
   </proxy>  

Error Sequence Configuration:

  <sequence name="fault">  
     <log level="full"/>  
     <property name="SET_ROLLBACK_ONLY" scope="axis2" value="true"/>  
   </sequence>  

Insequence Configuration:

   <sequence name="ErrorInSequence">  
     <call blocking="true">  
       <endpoint>  
         <address uri="http://localhost:9000/services/SimpleStockQuoteService"/>  
       </endpoint>  
     </call>  
     <description/>  
   </sequence>  

Running Sample:

* Publish messages to queue (errorQueue) in Message Broker. I used external jms message publisher.

External JMS Publisher Java Class:

 import javax.jms.*;  
 import javax.naming.Context;  
 import javax.naming.InitialContext;  
 import javax.naming.NamingException;  
 import java.util.Properties;  
 /**  
  * Created by dilshani on 5/11/16.  
  * This class will publish messages to MB Queue  
  */  
 public class QueuePublisher {  
   public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";  
   private static final String CF_NAME_PREFIX = "connectionfactory.";  
   private static final String QUEUE_NAME_PREFIX = "queue.";  
   private static final String CF_NAME = "qpidConnectionfactory";  
   String userName = "admin";  
   String password = "admin";  
   private static String CARBON_CLIENT_ID = "carbon";  
   private static String CARBON_VIRTUAL_HOST_NAME = "carbon";  
   private static String CARBON_DEFAULT_HOSTNAME = "localhost";  
   private static String CARBON_DEFAULT_PORT = "5672";  
   String queueName = "QueueProxy";  
   int publishMsgCount=1;  
   public static void main(String[] args) throws NamingException, JMSException {  
     QueuePublisher queueSender = new QueuePublisher();  
     queueSender.sendMessages();  
   }  
   public void sendMessages() throws NamingException, JMSException {  
     Properties properties = new Properties();  
     properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);  
     properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));  
     properties.put(QUEUE_NAME_PREFIX + queueName, queueName);  
     Properties properties2 = new Properties();  
     properties2.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);  
     properties2.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));  
     System.out.println("getTCPConnectionURL(userName,password) = " + getTCPConnectionURL(userName, password));  
     InitialContext ctx = new InitialContext(properties);  
     // Lookup connection factory  
     QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.lookup(CF_NAME);  
     QueueConnection queueConnection = connFactory.createQueueConnection();  
     queueConnection.start();  
     QueueSession queueSession = queueConnection.createQueueSession(false,QueueSession.AUTO_ACKNOWLEDGE );//  
     Queue queue = (Queue)ctx.lookup(queueName);  
     javax.jms.QueueSender queueSender = queueSession.createSender(queue);  
     //sending 5 messages to the above created queue here  
     for(Integer i=1;i<=publishMsgCount;i=i+1){  
       TextMessage textMessage = queueSession.createTextMessage("::::Test Message:::: "+i+":::Publish to queue:::"+queueName+":::From IP:::"+CARBON_DEFAULT_HOSTNAME);  
       textMessage.setStringProperty("msgID", i.toString());  
       queueSender.send(textMessage);  
       System.out.println("Publishing Test Message "+i+"::Published From IP::"+CARBON_DEFAULT_HOSTNAME);  
     }  
     queueSender.close();  
     queueSession.close();  
     queueConnection.close();  
   }  
   public String getTCPConnectionURL(String username, String password) {  
     // amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'  
     return new StringBuffer()  
         .append("amqp://").append(username).append(":").append(password)  
         .append("@").append(CARBON_CLIENT_ID)  
         .append("/").append(CARBON_VIRTUAL_HOST_NAME)  
         .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")  
         .toString();  
   }  
 }  

* You may see response from backend (Axis2 server on port 9000)

* Shut down axis 2 server and Publish messages to jms queue.

* Check ESB server terminal. It may print error sequence log several times. (Which is trying to rollback the transaction)

Note : As its' rollback transaction, message will be send to jms queue. But again it will be taken from proxy as it listen to that and will try to send. Then again move to fault sequence and it will be working like a round. (Untill time out)

Now you have a working sample of ESB local transactionality.


No comments:

Post a Comment