Sample Java Code to Subscribe to an IBM MQ Topic

Here is blog posting # 2 on subscribing to a topic in IBM MQ. More code examples – here is a Java IBM MQ (non-JMS) sample program to subscribe to a topic within a queue manager of IBM MQ. It will receives messages until ‘no messages available’ exception which is set for 30 seconds.

You can download the source code from here.

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Hashtable;

import com.ibm.mq.MQException;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQQueueManager;
import com.ibm.mq.MQTopic;
import com.ibm.mq.constants.CMQC;

/**
 * Program Name
 *  MQSub01
 *
 * Description
 *  This java class will connect to a queue manager, subscribe to a topic and receive messages.
 *
 * Sample Command Line Parameters
 *  -h 127.0.0.1 -p 1414 -c TEST.CHL -m MQA1 -t topicString -o topicObject -u userId -x password
 *
 * @author Roger Lacroix, Capitalware Inc.
 */
public class MQSub01
{
   private static final SimpleDateFormat  lOGGER_TIMESTAMP = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
   
   private Hashtable<String, String> params = null;
   private Hashtable<String, Object> mqht = null;
   private String qMgrName;
   private String topicString = null;
   private String topicObject = null;

   /**
    * The constructor
    */
   public MQSub01()
   {
      super();
      MQSub01.logger("Is now starting.");
   }

   /**
    * Make sure the required parameters are present.
    * @return true/false
    */
   private boolean allParamsPresent()
   {
      boolean b = params.containsKey("-h") && params.containsKey("-p") &&
                  params.containsKey("-c") && params.containsKey("-m") &&
                  params.containsKey("-u") && params.containsKey("-x");
      
      // Need at least one
      if ( (!params.containsKey("-t")) && (!params.containsKey("-o")) )
         b = false;
      
      if (b)
      {
         try
         {
            Integer.parseInt((String) params.get("-p"));
         }
         catch (NumberFormatException e)
         {
            b = false;
         }
      }

      return b;
   }

   /**
    * Extract the command-line parameters and initialize the MQ variables.
    * @param args
    * @throws IllegalArgumentException
    */
   private void init(String[] args) throws IllegalArgumentException
   {
      params = new Hashtable<String, String>();
      if (args.length > 0 && (args.length % 2) == 0)
      {
         for (int i = 0; i < args.length; i += 2)
         {
            params.put(args[i], args[i + 1]);
         }
      }
      else
      {
         throw new IllegalArgumentException();
      }

      if (allParamsPresent())
      {
         qMgrName = (String) params.get("-m");
         topicString = (String) params.get("-t");
         topicObject = (String) params.get("-o");
         
         mqht = new Hashtable<String, Object>();

         mqht.put(CMQC.CHANNEL_PROPERTY, params.get("-c"));
         mqht.put(CMQC.HOST_NAME_PROPERTY, params.get("-h"));

         try
         {
            mqht.put(CMQC.PORT_PROPERTY, new Integer(params.get("-p")));
         }
         catch (NumberFormatException e)
         {
            mqht.put(CMQC.PORT_PROPERTY, new Integer(1414));
         }
         
         mqht.put(CMQC.USER_ID_PROPERTY, params.get("-u"));
         mqht.put(CMQC.PASSWORD_PROPERTY, params.get("-x"));

         // I don't want to see MQ exceptions at the console.
         MQException.log = null;
      }
      else
      {
         throw new IllegalArgumentException();
      }
   }

   /**
    * Connect, open topic, receive messages, close topic and disconnect.
    *
    */
   private void testSub()
   {
      int openOptionsForGet = CMQC.MQSO_CREATE | CMQC.MQSO_FAIL_IF_QUIESCING | CMQC.MQSO_MANAGED | CMQC.MQSO_NON_DURABLE;
      MQQueueManager _qMgr = null;
      MQTopic subscriber = null;
      MQGetMessageOptions gmo = new MQGetMessageOptions();
      gmo.options = CMQC.MQGMO_WAIT + CMQC.MQGMO_FAIL_IF_QUIESCING;
      gmo.waitInterval = 30000;  // wait up to 30 seconds
      MQMessage mqMsg = null;
      String msgText = null;
      boolean more = true;

      try
      {
         _qMgr = new MQQueueManager(qMgrName, mqht);
         MQSub01.logger("connected to queue manager: " + qMgrName);
         
         subscriber = _qMgr.accessTopic( topicString,
                                         topicObject,
                                         CMQC.MQTOPIC_OPEN_AS_SUBSCRIPTION,
                                         openOptionsForGet);
         logger("subscribed to topic: " + subscriber.getName());

         while (more)
         {
            try
            {
               mqMsg = new MQMessage();
               mqMsg.messageId = CMQC.MQMI_NONE;
               mqMsg.correlationId = CMQC.MQCI_NONE;
               subscriber.get(mqMsg, gmo);
               msgText = mqMsg.readStringOfByteLength(mqMsg.getMessageLength());
               MQSub01.logger("received message: " + msgText);
            }
            catch (MQException e)
            {
               more = false;
               MQSub01.logger("MQException CC=" +e.completionCode + " : RC=" + e.reasonCode);
            }
            catch (IOException e)
            {
               more = false;
               MQSub01.logger("IOException " + e.getLocalizedMessage());
            }
         }
      }
      catch (MQException e)
      {
         MQSub01.logger("MQException CC=" +e.completionCode + " : RC=" + e.reasonCode);
      }
      finally
      {
         try
         {
            if (subscriber != null)
               subscriber.close();
         }
         catch (MQException e)
         {
            MQSub01.logger("MQException CC=" +e.completionCode + " : RC=" + e.reasonCode);
         }

         try
         {
            if (_qMgr != null)
               _qMgr.disconnect();
         }
         catch (MQException e)
         {
            MQSub01.logger("MQException CC=" +e.completionCode + " : RC=" + e.reasonCode);
         }
      }
      
      MQSub01.logger("Is now ending.");
   }

   /**
    * A simple logger method
    * @param data
    */
   public static void logger(String data)
   {
      String className = Thread.currentThread().getStackTrace()[2].getClassName();

      // Remove the package info.
      if ( (className != null) && (className.lastIndexOf('.') != -1) )
         className = className.substring(className.lastIndexOf('.')+1);

      System.out.println(lOGGER_TIMESTAMP.format(new Date())+" "+className+": "+Thread.currentThread().getStackTrace()[2].getMethodName()+": "+data);
   }

   /**
    * main line
    * @param args
    */
   public static void main(String[] args)
   {
      MQSub01 mqs = new MQSub01();

      try
      {
         mqs.init(args);
         mqs.testSub();
      }
      catch (IllegalArgumentException e)
      {
         System.out.println("Usage: java MQSub01 -h host -p port -c channel -m QueueManagerName -t topicString -o topicObject -u userId -x password");
         System.exit(1);
      }

      System.exit(0);
   }
}

Things to note:

  • The allParamsPresent and init methods makes sure all of the required parameters are present at program startup.
  • The testSub method does the following:
    • Connects to the queue manager
    • Opens the specified topic as a subscription by either Topic String or Topic Object or both
    • Loop while there are more messages – the wait interval is set to 30 seconds
    • When Reason Code of 2033 (MQRC_NO_MSG_AVAILABLE) exception is thrown then exit the loop
    • Closes the topic
    • Disconnects from the queue manager

Regards,
Roger Lacroix
Capitalware Inc.

This entry was posted in IBM i (OS/400), Java, Linux, Mac OS X, MQ, Programming, Unix, Windows, z/OS.

Leave a Reply

Your email address will not be published. Required fields are marked *

*