Here is blog posting # 2 on subscribing to a topic in IBM MQ. More code examples – here is a Java IBM MQ (non-JMS) sample program to subscribe to a topic within a queue manager of IBM MQ. It will receives messages until ‘no messages available’ exception which is set for 30 seconds.
You can download the source code from here.
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Hashtable;
import com.ibm.mq.MQException;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQQueueManager;
import com.ibm.mq.MQTopic;
import com.ibm.mq.constants.CMQC;
/**
* Program Name
* MQSub01
*
* Description
* This java class will connect to a queue manager, subscribe to a topic and receive messages.
*
* Sample Command Line Parameters
* -h 127.0.0.1 -p 1414 -c TEST.CHL -m MQA1 -t topicString -o topicObject -u userId -x password
*
* @author Roger Lacroix, Capitalware Inc.
*/
public class MQSub01
{
private static final SimpleDateFormat lOGGER_TIMESTAMP = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
private Hashtable<String, String> params = null;
private Hashtable<String, Object> mqht = null;
private String qMgrName;
private String topicString = null;
private String topicObject = null;
/**
* The constructor
*/
public MQSub01()
{
super();
MQSub01.logger("Is now starting.");
}
/**
* Make sure the required parameters are present.
* @return true/false
*/
private boolean allParamsPresent()
{
boolean b = params.containsKey("-h") && params.containsKey("-p") &&
params.containsKey("-c") && params.containsKey("-m") &&
params.containsKey("-u") && params.containsKey("-x");
// Need at least one
if ( (!params.containsKey("-t")) && (!params.containsKey("-o")) )
b = false;
if (b)
{
try
{
Integer.parseInt((String) params.get("-p"));
}
catch (NumberFormatException e)
{
b = false;
}
}
return b;
}
/**
* Extract the command-line parameters and initialize the MQ variables.
* @param args
* @throws IllegalArgumentException
*/
private void init(String[] args) throws IllegalArgumentException
{
params = new Hashtable<String, String>();
if (args.length > 0 && (args.length % 2) == 0)
{
for (int i = 0; i < args.length; i += 2)
{
params.put(args[i], args[i + 1]);
}
}
else
{
throw new IllegalArgumentException();
}
if (allParamsPresent())
{
qMgrName = (String) params.get("-m");
topicString = (String) params.get("-t");
topicObject = (String) params.get("-o");
mqht = new Hashtable<String, Object>();
mqht.put(CMQC.CHANNEL_PROPERTY, params.get("-c"));
mqht.put(CMQC.HOST_NAME_PROPERTY, params.get("-h"));
try
{
mqht.put(CMQC.PORT_PROPERTY, new Integer(params.get("-p")));
}
catch (NumberFormatException e)
{
mqht.put(CMQC.PORT_PROPERTY, new Integer(1414));
}
mqht.put(CMQC.USER_ID_PROPERTY, params.get("-u"));
mqht.put(CMQC.PASSWORD_PROPERTY, params.get("-x"));
// I don't want to see MQ exceptions at the console.
MQException.log = null;
}
else
{
throw new IllegalArgumentException();
}
}
/**
* Connect, open topic, receive messages, close topic and disconnect.
*
*/
private void testSub()
{
int openOptionsForGet = CMQC.MQSO_CREATE | CMQC.MQSO_FAIL_IF_QUIESCING | CMQC.MQSO_MANAGED | CMQC.MQSO_NON_DURABLE;
MQQueueManager _qMgr = null;
MQTopic subscriber = null;
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.options = CMQC.MQGMO_WAIT + CMQC.MQGMO_FAIL_IF_QUIESCING;
gmo.waitInterval = 30000; // wait up to 30 seconds
MQMessage mqMsg = null;
String msgText = null;
boolean more = true;
try
{
_qMgr = new MQQueueManager(qMgrName, mqht);
MQSub01.logger("connected to queue manager: " + qMgrName);
subscriber = _qMgr.accessTopic( topicString,
topicObject,
CMQC.MQTOPIC_OPEN_AS_SUBSCRIPTION,
openOptionsForGet);
logger("subscribed to topic: " + subscriber.getName());
while (more)
{
try
{
mqMsg = new MQMessage();
mqMsg.messageId = CMQC.MQMI_NONE;
mqMsg.correlationId = CMQC.MQCI_NONE;
subscriber.get(mqMsg, gmo);
msgText = mqMsg.readStringOfByteLength(mqMsg.getMessageLength());
MQSub01.logger("received message: " + msgText);
}
catch (MQException e)
{
more = false;
MQSub01.logger("MQException CC=" +e.completionCode + " : RC=" + e.reasonCode);
}
catch (IOException e)
{
more = false;
MQSub01.logger("IOException " + e.getLocalizedMessage());
}
}
}
catch (MQException e)
{
MQSub01.logger("MQException CC=" +e.completionCode + " : RC=" + e.reasonCode);
}
finally
{
try
{
if (subscriber != null)
subscriber.close();
}
catch (MQException e)
{
MQSub01.logger("MQException CC=" +e.completionCode + " : RC=" + e.reasonCode);
}
try
{
if (_qMgr != null)
_qMgr.disconnect();
}
catch (MQException e)
{
MQSub01.logger("MQException CC=" +e.completionCode + " : RC=" + e.reasonCode);
}
}
MQSub01.logger("Is now ending.");
}
/**
* A simple logger method
* @param data
*/
public static void logger(String data)
{
String className = Thread.currentThread().getStackTrace()[2].getClassName();
// Remove the package info.
if ( (className != null) && (className.lastIndexOf('.') != -1) )
className = className.substring(className.lastIndexOf('.')+1);
System.out.println(lOGGER_TIMESTAMP.format(new Date())+" "+className+": "+Thread.currentThread().getStackTrace()[2].getMethodName()+": "+data);
}
/**
* main line
* @param args
*/
public static void main(String[] args)
{
MQSub01 mqs = new MQSub01();
try
{
mqs.init(args);
mqs.testSub();
}
catch (IllegalArgumentException e)
{
System.out.println("Usage: java MQSub01 -h host -p port -c channel -m QueueManagerName -t topicString -o topicObject -u userId -x password");
System.exit(1);
}
System.exit(0);
}
}
Things to note:
- The allParamsPresent and init methods makes sure all of the required parameters are present at program startup.
- The testSub method does the following:
- Connects to the queue manager
- Opens the specified topic as a subscription by either Topic String or Topic Object or both
- Loop while there are more messages – the wait interval is set to 30 seconds
- When Reason Code of 2033 (MQRC_NO_MSG_AVAILABLE) exception is thrown then exit the loop
- Closes the topic
- Disconnects from the queue manager
Regards,
Roger Lacroix
Capitalware Inc.