Wednesday, May 11, 2016

How to write a JMS publisher for queue in WSO2 MB ?

Hello all,

We are gonna write a JMS publisher for queue WSO2 MB. Let's begin ...

Prerequisites:
  • WSO2 MB pack should be downloaded.
  • MB should be up and running (You can follow WSO2 MB documentation, if you are not familiar with MB) 
  • Java should be installed (As we implement JMS publisher as a Java Class)
Environment used:
  • Oracle jdk 1.8
  • MB 3.1.0
  • IDEA as 
JMS Publisher class:

 import javax.jms.*;  
 import javax.naming.Context;  
 import javax.naming.InitialContext;  
 import javax.naming.NamingException;  
 import java.util.Properties;  
 /**  
  * This class will publish messages to MB Queue  
  */  
 public class QueuePublisher {  
   public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";  
   private static final String CF_NAME_PREFIX = "connectionfactory.";  
   private static final String QUEUE_NAME_PREFIX = "queue.";  
   private static final String CF_NAME = "qpidConnectionfactory";  
   String userName = "admin";  
   String password = "admin";  
   private static String CARBON_CLIENT_ID = "carbon";  
   private static String CARBON_VIRTUAL_HOST_NAME = "carbon";  
   private static String CARBON_DEFAULT_HOSTNAME = "localhost";  
   private static String CARBON_DEFAULT_PORT = "5672";  
   String queueName = "JMSProxy";  //Change this with relevant queue name
   int publishMsgCount=5;  
   public static void main(String[] args) throws NamingException, JMSException {  
     QueuePublisher queueSender = new QueuePublisher();  
     queueSender.sendMessages();  
   }  
   public void sendMessages() throws NamingException, JMSException {  
     Properties properties = new Properties();  
     properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);  
     properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));  
     properties.put(QUEUE_NAME_PREFIX + queueName, queueName);  
     Properties properties2 = new Properties();  
     properties2.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);  
     properties2.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));  
     System.out.println("getTCPConnectionURL(userName,password) = " + getTCPConnectionURL(userName, password));  
     InitialContext ctx = new InitialContext(properties);  
     // Lookup connection factory  
     QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.lookup(CF_NAME);  
     QueueConnection queueConnection = connFactory.createQueueConnection();  
     queueConnection.start();  
     QueueSession queueSession = queueConnection.createQueueSession(false,QueueSession.AUTO_ACKNOWLEDGE );//  
     Queue queue = (Queue)ctx.lookup(queueName);  
     javax.jms.QueueSender queueSender = queueSession.createSender(queue);  
     //sending 5 messages to the above created queue here  
     for(Integer i=1;i<=publishMsgCount;i=i+1){  
       TextMessage textMessage = queueSession.createTextMessage("::::Test Message:::: "+i+":::Publish to queue:::"+queueName+":::From IP:::"+CARBON_DEFAULT_HOSTNAME);  
       textMessage.setStringProperty("msgID", i.toString());  
       queueSender.send(textMessage);  
       System.out.println("Publishing Test Message "+i+"::Published From IP::"+CARBON_DEFAULT_HOSTNAME);  
     }  
     queueSender.close();  
     queueSession.close();  
     queueConnection.close();  
   }  
   public String getTCPConnectionURL(String username, String password) {  
     // amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'  
     return new StringBuffer()  
         .append("amqp://").append(username).append(":").append(password)  
         .append("@").append(CARBON_CLIENT_ID)  
         .append("/").append(CARBON_VIRTUAL_HOST_NAME)  
         .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")  
         .toString();  
   }  
 }  


** For this java class you have to add client libraries as dependencies. You can find relevant jars from $MB_HOME/client-lib/

Add all these jars as dependencies.
  • andes-client-3.1.1.jar
  • geronimo-jms_1.1_spec-1.1.0.wso2v1.jar
  • log4j-1.2.13.jar
  • org.wso2.carbon.logging-4.4.1.jar
  • org.wso2.securevault-1.0.0-wso2v2.jar
  • slf4j-1.5.10.wso2v1.jar
You are successfully done with JMS publisher. Add a queue in MB which you want to publish messages and add that name in the JMS publisher class.

(Note: In here I used "JMSProxy" queue to publish messages.)

No comments:

Post a Comment