MQ Request/Reply Scenario with Async Getter Thread

Ok. Continuing on with the previous blog posting here, the StackOverflow user says that s/he wants the getter component to be run in a separate thread.

To me, the request does not really make a lot of sense. If the user wants to do a Get with CorrelId then they should be coding for synchronous processing and not Asynchronous.

So, I have created a fully functioning Java/MQ application that will perform the request/response as an asynchronous process. The code uses Java’s Blocking Queue to communicate between the 2 threads. It will put 10 messages on the queue and then pass 3 random numbers as CorrelId to the Getter thread to perform a Get by CorrelId and a 4th invalid random number on the blocking queue to show how a failure would work.

You can download the source code from here.

import java.io.IOException;
import java.text.DecimalFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Hashtable;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

import com.ibm.mq.MQException;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPutMessageOptions;
import com.ibm.mq.MQQueue;
import com.ibm.mq.MQQueueManager;
import com.ibm.mq.constants.CMQC;

/**
 * Program Name
 *  MQTest11Async
 *
 * Description
 *  This java class will connect to a remote queue manager with the
 *  MQ setting stored in a HashTable, 
 *  
 *  Functionality:
 *  - Parse input parameters
 *  - Start a child thread for retrieving messages by CorrelId
 *  - Connect and open queue 
 *  - Put 10 message on a queue with unique CorrelIds
 *  - Pass the CorrelId to the child thread via the Blocking Queue
 *  - Close and disconnect
 *  
 *  Child thread:
 *  - Connect and open queue
 *  - Waiting on Blocking Queue 
 *  - Retrieve the message by CorrelId
 *  - When the QUIT message is received exit loop
 *  - Close and disconnect
 *
 * Sample Command Line Parameters
 *  -m MQA1 -h 127.0.0.1 -p 1414 -c TEST.CHL -q TEST.Q1 -u UserID -x Password
 *
 * @author Roger Lacroix
 */
public class MQTest11Async
{
   private static final SimpleDateFormat  LOGGER_TIMESTAMP = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
   private static final String            QUIT_MSG = "QUIT";
   private Hashtable<String,String>       params = new Hashtable<String,String>();
   private Hashtable<String,Object>       mqht = new Hashtable<String,Object>();

   /**
    * The constructor
    */
   public MQTest11Async()
   {
      super();
   }

   /**
    * Make sure the required parameters are present.
    * @return true/false
    */
   private boolean allParamsPresent()
   {
      boolean b = params.containsKey("-h") && params.containsKey("-p") &&
                  params.containsKey("-c") && params.containsKey("-m") &&
                  params.containsKey("-q") &&
                  params.containsKey("-u") && params.containsKey("-x");
      if (b)
      {
         try
         {
            Integer.parseInt((String) params.get("-p"));
         }
         catch (NumberFormatException e)
         {
            b = false;
         }
      }

      return b;
   }

   /**
    * Extract the command-line parameters and initialize the MQ HashTable.
    * @param args
    * @throws IllegalArgumentException
    */
   private void init(String[] args) throws IllegalArgumentException
   {
      int port = 1414;
      if (args.length > 0 && (args.length % 2) == 0)
      {
         for (int i = 0; i < args.length; i += 2)
         {
            params.put(args[i], args[i + 1]);
         }
      }
      else
      {
         throw new IllegalArgumentException();
      }

      if (allParamsPresent())
      {

         try
         {
            port = Integer.parseInt((String) params.get("-p"));
         }
         catch (NumberFormatException e)
         {
            port = 1414;
         }
         
         mqht.put(CMQC.CHANNEL_PROPERTY, params.get("-c"));
         mqht.put(CMQC.HOST_NAME_PROPERTY, params.get("-h"));
         mqht.put(CMQC.PORT_PROPERTY, new Integer(port));
         mqht.put(CMQC.USER_ID_PROPERTY, params.get("-u"));
         mqht.put(CMQC.PASSWORD_PROPERTY, params.get("-x"));

         // I don't want to see MQ exceptions at the console.
         MQException.log = null;
      }
      else
      {
         throw new IllegalArgumentException();
      }
   }

   /**
    * Connect, open queue, write 10 messages, close queue and disconnect.
    */
   private void testSend()
   {
      MQPutMessageOptions     pmo = new MQPutMessageOptions();
      pmo.options = CMQC.MQPMO_NO_SYNCPOINT | CMQC.MQPMO_FAIL_IF_QUIESCING;
      MQQueueManager          qMgr = null;
      MQQueue                 queue = null;
      MQMessage               sendmsg;
      String                  msgData;
      DecimalFormat           df = new DecimalFormat("0000");
      BlockingQueue<Object>   toGetter_BQ = new ArrayBlockingQueue<Object>(100);
      String                  qMgrName = (String) params.get("-m");;
      String                  outputQName = (String) params.get("-q");

      /**
       * Start up child "getter" thread.
       */
      Thread t1 = new Thread(new Getter(toGetter_BQ, qMgrName, outputQName));
      t1.start();

      try
      {
         qMgr = new MQQueueManager(qMgrName, mqht);
         logger("successfully connected to "+ qMgrName);

         queue = qMgr.accessQueue(outputQName, CMQC.MQOO_OUTPUT | CMQC.MQOO_FAIL_IF_QUIESCING);
         logger("successfully opened "+ outputQName);
         
         /*
          * Code to send 10 messages with a specific CorrelId.  i.e. 0001, 0002, etc.
          */
         for (int i=0; i < 10; i++)
         {
            // Define a simple MQ message, and write some text
            sendmsg = new MQMessage();
            sendmsg.format = CMQC.MQFMT_STRING;
            sendmsg.messageId = CMQC.MQMI_NONE;
            sendmsg.correlationId = df.format(i+1).getBytes();

            // Write message data
            msgData = "This is a test message from MQTest11Async. CorrelID is "+new String(sendmsg.correlationId);
            sendmsg.writeString(msgData);

            // put the message on the queue
            queue.put(sendmsg, pmo);
            logger("Sent: Message Data>>>" + msgData);
         }
         
         try
         {
            // Put 3 random numbers on the blocking queue to be used as CorrelId by the child thread. 
            toGetter_BQ.put(df.format(5).getBytes());
            toGetter_BQ.put(df.format(3).getBytes());
            toGetter_BQ.put(df.format(7).getBytes());

            // Put on unknown CorrelId
            toGetter_BQ.put(df.format(99).getBytes());

            // Ok, tell child we are done.
            toGetter_BQ.put(QUIT_MSG);
         }
         catch (InterruptedException ie)
         {
            logger("InterruptedException: "+ie.getMessage());
         }
      }
      catch (MQException e)
      {
         logger("CC=" +e.completionCode + " : RC=" + e.reasonCode);
      }
      catch (IOException e)
      {
         logger("IOException:" +e.getLocalizedMessage());
      }
      finally
      {
         try
         {
            if (queue != null)
            {
               queue.close();
               logger("closed: "+ outputQName);
            }
         }
         catch (MQException e)
         {
            logger("CC=" +e.completionCode + " : RC=" + e.reasonCode);
         }
         try
         {
            if (qMgr != null)
            {
               qMgr.disconnect();
               logger("disconnected from "+ qMgrName);
            }
         }
         catch (MQException e)
         {
            logger("CC=" +e.completionCode + " : RC=" + e.reasonCode);
         }
      }
      
      try
      {
         // Wait for child thread to be done.
         t1.join();
      }
      catch (InterruptedException ie)
      {
         logger("InterruptedException: "+ie.getMessage());
      }
   }

   /**
    * Getter class to be run as a separate thread to retrieve messages from a queue. 
    * Connect, open queue, wait on blocking queue for instructions on what to do.  
    * Loop until we get the QUIT message then close queue and disconnect. 
    */
   class Getter implements Runnable 
   {
      private BlockingQueue<Object> toGetter_BQ;
      private String                qMgrName;
      private String                replyQName;
      private boolean               working = true;

      /**
       * The constructor
       * @param toGetter_BQ Blocking Queue
       * @param qMgrName Queue Manager name
       * @param replyQName Reply queue name
       */
      public Getter(BlockingQueue<Object> toGetter_BQ, String qMgrName, String replyQName)
      {
         super();
         this.toGetter_BQ = toGetter_BQ;
         this.qMgrName = qMgrName;
         this.replyQName = replyQName;
      }
      
      @Override
      public void run() 
      {
         Object         o;
         MQQueueManager qMgr = null;
         MQQueue        queue = null;

         try
         {
            qMgr = new MQQueueManager(qMgrName, mqht);
            logger("successfully connected to "+ qMgrName);

            queue = qMgr.accessQueue(replyQName, CMQC.MQOO_INPUT_SHARED | CMQC.MQOO_FAIL_IF_QUIESCING);
            logger("successfully opened "+ replyQName);
            
            while (working)
            {
               // poll returns immediately with either an object or null
               o = toGetter_BQ.poll();

               /*
                * Check what we got off the blocking queue.
                */
               if (o == null)
               {
                  try
                  {
                     // Nothing to do! Put a slight pause in the loop. 
                     if (working)
                        Thread.sleep(50); // time in milliseconds
                  }
                  catch (InterruptedException ie)
                  {}
               }
               else if (o instanceof byte[])
               {
                  logger("Retrieve the next CorrelId: " + new String((byte[])o));
                  getMessage((byte[])o, queue);
               }
               else if (o instanceof String)
               {
                  if (QUIT_MSG.equals((String)o))
                  {
                     logger("quitting time. ");
                     working = false;
                  }
                  else
                  {
                     logger("Error: unknown string command: " + (String)o);
                  }
               }
               else
               {
                  logger("Error: unknown object passed into BlockingQueue.");
               }
            }
         }
         catch (MQException e)
         {
            logger("CC=" +e.completionCode + " : RC=" + e.reasonCode);
         }
         finally
         {
            try
            {
               if (queue != null)
               {
                  queue.close();
                  logger("closed: "+ replyQName);
               }
            }
            catch (MQException e)
            {
               logger("CC=" +e.completionCode + " : RC=" + e.reasonCode);
            }
            try
            {
               if (qMgr != null)
               {
                  qMgr.disconnect();
                  logger("disconnected from "+ qMgrName);
               }
            }
            catch (MQException e)
            {
               logger("CC=" +e.completionCode + " : RC=" + e.reasonCode);
            }
         }
      }

      /**
       * Retrieve a message by specific CorrelId
       * @param correlId
       * @param queue
       */
      private void getMessage(byte[] correlId, MQQueue queue)
      {
         logger("Attempting to get message from queue.");
         
         MQGetMessageOptions gmo = new MQGetMessageOptions();
         gmo.options = CMQC.MQGMO_NO_SYNCPOINT | CMQC.MQGMO_WAIT | CMQC.MQGMO_CONVERT | CMQC.MQGMO_FAIL_IF_QUIESCING;
         gmo.matchOptions = CMQC.MQMO_MATCH_CORREL_ID;
         gmo.waitInterval = 5000; // 5 seconds or you can use CMQC.MQWI_UNLIMITED
         
         // Define a simple MQ message, and write some text
         MQMessage receiveMsg = new MQMessage();
         receiveMsg.messageId = CMQC.MQMI_NONE;
         receiveMsg.correlationId = correlId;

         try
         {
            // get the message on the queue
            queue.get(receiveMsg, gmo);

            if (CMQC.MQFMT_STRING.equals(receiveMsg.format))
            {
               String msgStr = receiveMsg.readStringOfByteLength(receiveMsg.getMessageLength());
               logger("Received: Message Data>>>" + msgStr);
            }
            else
            {
               byte[] b = new byte[receiveMsg.getMessageLength()];
               receiveMsg.readFully(b);
               logger("Received: Message Data>>>" + new String(b));
            }
         }
         catch (MQException e)
         {
            logger("CC=" +e.completionCode + " : RC=" + e.reasonCode);
         }
         catch (IOException e)
         {
            logger("IOException:" +e.getLocalizedMessage());
         }
      }
  }
   /**
    * A simple logger method
    * @param data
    */
   public static void logger(String data)
   {
      String className = Thread.currentThread().getStackTrace()[2].getClassName();

      // Remove the package info.
      if ( (className != null) && (className.lastIndexOf('.') != -1) )
         className = className.substring(className.lastIndexOf('.')+1);

      System.out.println(LOGGER_TIMESTAMP.format(new Date())+" "+className+": "+Thread.currentThread().getStackTrace()[2].getMethodName()+": "+data);
   }

   /**
    * main line
    * @param args
    */
   public static void main(String[] args)
   {
      MQTest11Async mqta = new MQTest11Async();

      try
      {
         mqta.init(args);
         mqta.testSend();
         
         logger("exiting.");
      }
      catch (IllegalArgumentException e)
      {
         logger("Usage: java MQTest11Async -m QueueManagerName -h host -p port -c channel -q QueueName -u UserID -x Password");
         System.exit(1);
      }

      System.exit(0);
   }
}

And the output will look like:

2021/07/08 13:27:04.684 MQTest11Async: testSend: successfully connected to MQA1
2021/07/08 13:27:04.684 MQTest11Async$Getter: run: successfully connected to MQA1
2021/07/08 13:27:04.700 MQTest11Async: testSend: successfully opened TEST.Q1
2021/07/08 13:27:04.700 MQTest11Async$Getter: run: successfully opened TEST.Q1
2021/07/08 13:27:04.700 MQTest11Async: testSend: Sent: Message Data>>>This is a test message from MQTest11Async. CorrelID is 0001
2021/07/08 13:27:04.700 MQTest11Async: testSend: Sent: Message Data>>>This is a test message from MQTest11Async. CorrelID is 0002
2021/07/08 13:27:04.700 MQTest11Async: testSend: Sent: Message Data>>>This is a test message from MQTest11Async. CorrelID is 0003
2021/07/08 13:27:04.700 MQTest11Async: testSend: Sent: Message Data>>>This is a test message from MQTest11Async. CorrelID is 0004
2021/07/08 13:27:04.700 MQTest11Async: testSend: Sent: Message Data>>>This is a test message from MQTest11Async. CorrelID is 0005
2021/07/08 13:27:04.700 MQTest11Async: testSend: Sent: Message Data>>>This is a test message from MQTest11Async. CorrelID is 0006
2021/07/08 13:27:04.700 MQTest11Async: testSend: Sent: Message Data>>>This is a test message from MQTest11Async. CorrelID is 0007
2021/07/08 13:27:04.700 MQTest11Async: testSend: Sent: Message Data>>>This is a test message from MQTest11Async. CorrelID is 0008
2021/07/08 13:27:04.700 MQTest11Async: testSend: Sent: Message Data>>>This is a test message from MQTest11Async. CorrelID is 0009
2021/07/08 13:27:04.700 MQTest11Async: testSend: Sent: Message Data>>>This is a test message from MQTest11Async. CorrelID is 0010
2021/07/08 13:27:04.700 MQTest11Async: testSend: closed: TEST.Q1
2021/07/08 13:27:04.715 MQTest11Async: testSend: disconnected from MQA1
2021/07/08 13:27:04.762 MQTest11Async$Getter: run: Retrieve the next CorrelId: 0005
2021/07/08 13:27:04.762 MQTest11Async$Getter: getMessage: Attempting to get message from queue.
2021/07/08 13:27:04.769 MQTest11Async$Getter: getMessage: Received: Message Data>>>This is a test message from MQTest11Async. CorrelID is 0005
2021/07/08 13:27:04.769 MQTest11Async$Getter: run: Retrieve the next CorrelId: 0003
2021/07/08 13:27:04.769 MQTest11Async$Getter: getMessage: Attempting to get message from queue.
2021/07/08 13:27:04.769 MQTest11Async$Getter: getMessage: Received: Message Data>>>This is a test message from MQTest11Async. CorrelID is 0003
2021/07/08 13:27:04.769 MQTest11Async$Getter: run: Retrieve the next CorrelId: 0007
2021/07/08 13:27:04.769 MQTest11Async$Getter: getMessage: Attempting to get message from queue.
2021/07/08 13:27:04.769 MQTest11Async$Getter: getMessage: Received: Message Data>>>This is a test message from MQTest11Async. CorrelID is 0007
2021/07/08 13:27:04.769 MQTest11Async$Getter: run: Retrieve the next CorrelId: 0099
2021/07/08 13:27:04.769 MQTest11Async$Getter: getMessage: Attempting to get message from queue.
2021/07/08 13:27:09.774 MQTest11Async$Getter: getMessage: CC=2 : RC=2033
2021/07/08 13:27:09.774 MQTest11Async$Getter: run: quitting time.
2021/07/08 13:27:09.774 MQTest11Async$Getter: run: closed: TEST.Q1
2021/07/08 13:27:09.774 MQTest11Async$Getter: run: disconnected from MQA1
2021/07/08 13:27:09.774 MQTest11Async: main: exiting.

Regards,
Roger Lacroix
Capitalware Inc.

This entry was posted in HPE NonStop, IBM i (OS/400), IBM MQ, IBM MQ Appliance, Java, Linux, macOS (Mac OS X), Open Source, Programming, Raspberry Pi, Unix, Windows.

Comments are closed.