We are gonna write a JMS publisher for queue WSO2 MB. Let's begin ...
Prerequisites:
- WSO2 MB pack should be downloaded.
- MB should be up and running (You can follow WSO2 MB documentation, if you are not familiar with MB)
- Java should be installed (As we implement JMS publisher as a Java Class)
Environment used:
- Oracle jdk 1.8
- MB 3.1.0
- IDEA as
JMS Publisher class:
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;
/**
* This class will publish messages to MB Queue
*/
public class QueuePublisher {
public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
private static final String CF_NAME_PREFIX = "connectionfactory.";
private static final String QUEUE_NAME_PREFIX = "queue.";
private static final String CF_NAME = "qpidConnectionfactory";
String userName = "admin";
String password = "admin";
private static String CARBON_CLIENT_ID = "carbon";
private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
private static String CARBON_DEFAULT_HOSTNAME = "localhost";
private static String CARBON_DEFAULT_PORT = "5672";
String queueName = "JMSProxy"; //Change this with relevant queue name
int publishMsgCount=5;
public static void main(String[] args) throws NamingException, JMSException {
QueuePublisher queueSender = new QueuePublisher();
queueSender.sendMessages();
}
public void sendMessages() throws NamingException, JMSException {
Properties properties = new Properties();
properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);
properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
properties.put(QUEUE_NAME_PREFIX + queueName, queueName);
Properties properties2 = new Properties();
properties2.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);
properties2.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
System.out.println("getTCPConnectionURL(userName,password) = " + getTCPConnectionURL(userName, password));
InitialContext ctx = new InitialContext(properties);
// Lookup connection factory
QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.lookup(CF_NAME);
QueueConnection queueConnection = connFactory.createQueueConnection();
queueConnection.start();
QueueSession queueSession = queueConnection.createQueueSession(false,QueueSession.AUTO_ACKNOWLEDGE );//
Queue queue = (Queue)ctx.lookup(queueName);
javax.jms.QueueSender queueSender = queueSession.createSender(queue);
//sending 5 messages to the above created queue here
for(Integer i=1;i<=publishMsgCount;i=i+1){
TextMessage textMessage = queueSession.createTextMessage("::::Test Message:::: "+i+":::Publish to queue:::"+queueName+":::From IP:::"+CARBON_DEFAULT_HOSTNAME);
textMessage.setStringProperty("msgID", i.toString());
queueSender.send(textMessage);
System.out.println("Publishing Test Message "+i+"::Published From IP::"+CARBON_DEFAULT_HOSTNAME);
}
queueSender.close();
queueSession.close();
queueConnection.close();
}
public String getTCPConnectionURL(String username, String password) {
// amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
return new StringBuffer()
.append("amqp://").append(username).append(":").append(password)
.append("@").append(CARBON_CLIENT_ID)
.append("/").append(CARBON_VIRTUAL_HOST_NAME)
.append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
.toString();
}
}
** For this java class you have to add client libraries as dependencies. You can find relevant jars from $MB_HOME/client-lib/
Add all these jars as dependencies.
- andes-client-3.1.1.jar
- geronimo-jms_1.1_spec-1.1.0.wso2v1.jar
- log4j-1.2.13.jar
- org.wso2.carbon.logging-4.4.1.jar
- org.wso2.securevault-1.0.0-wso2v2.jar
- slf4j-1.5.10.wso2v1.jar
You are successfully done with JMS publisher. Add a queue in MB which you want to publish messages and add that name in the JMS publisher class.
(Note: In here I used "JMSProxy" queue to publish messages.)
No comments:
Post a Comment