Request & Reply JMS/MQRFH2 Messaging Scenario

Sometimes people over think a problem and write a complicated solution when really the solution is no different than any other Request/Reply scenario (use case).

On StackOverflow, someone asked a question about sending a reply JMS/MQRFH2 message using the RFH2 header and folder values from the original request message.

The code will copy the RFH2 header and folders from the request message to the reply message.

I did 3 extra things:

(1) Updated the destination queue name in the “jms” folder.

(2) Set the reply message’s Correlation Id to contain the request message’s Message Id.

(3) Used the checksum of the request message’s payload as a seed value for the random number generator that generated Powerball numbers for the reply message’s payload.

You can download the source code from here.

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Hashtable;
import java.util.Random;
import java.util.zip.CRC32;
import java.util.zip.Checksum;

import com.ibm.mq.MQException;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPutMessageOptions;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQQueue;
import com.ibm.mq.MQQueueManager;
import com.ibm.mq.constants.CMQC;
import com.ibm.mq.headers.MQDataException;
import com.ibm.mq.headers.MQRFH2;

/**
 * Program Name
 *  MQTest77
 *
 * Description
 *  This java class will connect to a remote queue manager with the MQ setting stored in a HashTable.
 *  It will get a request MQRFH2 message from an input queue and then send a reply MQRFH2 message.
 *
 * Sample Command Line Parameters
 *  -m MQA1 -h 127.0.0.1 -p 1414 -c TEST.CHL -q INPUT.Q1 -o OUTPUT.Q1 -u UserID -x Password
 *
 * @author Roger Lacroix
 */
public class MQTest77
{
   private static final SimpleDateFormat  LOGGER_TIMESTAMP = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");

   private Hashtable<String,String> params;
   private Hashtable<String,Object> mqht;
   private String qMgrName;
   private String inputQName;
   private String outputQName;

   /**
    * The constructor
    */
   public MQTest77()
   {
      super();
      params = new Hashtable<String,String>();
      mqht = new Hashtable<String,Object>();
   }

   /**
    * Make sure the required parameters are present.
    * @return true/false
    */
   private boolean allParamsPresent()
   {
      boolean b = params.containsKey("-h") && params.containsKey("-p") &&
                  params.containsKey("-c") && params.containsKey("-m") &&
                  params.containsKey("-q") && params.containsKey("-o") &&
                  params.containsKey("-u") && params.containsKey("-x");
      if (b)
      {
         try
         {
            Integer.parseInt((String) params.get("-p"));
         }
         catch (NumberFormatException e)
         {
            b = false;
         }
      }

      return b;
   }

   /**
    * Extract the command-line parameters and initialize the MQ HashTable.
    * @param args
    * @throws IllegalArgumentException
    */
   private void init(String[] args) throws IllegalArgumentException
   {
      int port = 1414;
      if (args.length > 0 && (args.length % 2) == 0)
      {
         for (int i = 0; i < args.length; i += 2)
         {
            params.put(args[i], args[i + 1]);
         }
      }
      else
      {
         throw new IllegalArgumentException();
      }

      if (allParamsPresent())
      {
         qMgrName = (String) params.get("-m");
         inputQName = (String) params.get("-q");
         outputQName = (String) params.get("-o");

         try
         {
            port = Integer.parseInt((String) params.get("-p"));
         }
         catch (NumberFormatException e)
         {
            port = 1414;
         }

         mqht.put(CMQC.CHANNEL_PROPERTY, params.get("-c"));
         mqht.put(CMQC.HOST_NAME_PROPERTY, params.get("-h"));
         mqht.put(CMQC.PORT_PROPERTY, new Integer(port));
         mqht.put(CMQC.USER_ID_PROPERTY, params.get("-u"));
         mqht.put(CMQC.PASSWORD_PROPERTY, params.get("-x"));

         // I don't want to see MQ exceptions at the console.
         MQException.log = null;
      }
      else
      {
         throw new IllegalArgumentException();
      }
   }

   /**
    * This method will do the following:
    * 1. Connect 
    * 2. Open input queue, get a request message, close queue 
    * 3. Open output queue, send a reply message, close queue
    * 4. Disconnect.
    */
   private void doIt()
   {
      MQQueueManager qMgr = null;
      
      try
      {
         qMgr = new MQQueueManager(qMgrName, mqht);
         MQTest77.logger("successfully connected to "+ qMgrName);
         
         MQMessage requestMsg = getRequestMsg(qMgr);
         
         if (requestMsg != null)
         {
            ResponseStuff rs = doSomeLogic(requestMsg);
            if (rs != null)
               sendReplyMsg(qMgr, rs);
         }
      }
      catch (MQException e)
      {
         MQTest77.logger("CC=" +e.completionCode + " : RC=" + e.reasonCode);
      }
      finally
      {
         try
         {
            if (qMgr != null)
            {
               qMgr.disconnect();
               MQTest77.logger("disconnected from "+ qMgrName);
            }
         }
         catch (MQException e)
         {
            MQTest77.logger("CC=" +e.completionCode + " : RC=" + e.reasonCode);
         }
      }
   }

   /**
    * Open the input queue, get a message, close queue.
    * @param qMgr
    * @return requestMsg
    */
   private MQMessage getRequestMsg(MQQueueManager qMgr)
   {
      MQQueue queue = null;
      int openOptions = CMQC.MQOO_INPUT_AS_Q_DEF + CMQC.MQOO_INQUIRE + CMQC.MQOO_FAIL_IF_QUIESCING;
      MQGetMessageOptions gmo = new MQGetMessageOptions();
      gmo.options = CMQC.MQGMO_PROPERTIES_FORCE_MQRFH2 + CMQC.MQGMO_FAIL_IF_QUIESCING + CMQC.MQGMO_NO_WAIT;
      MQMessage requestMsg = null;
      
      try
      {
         queue = qMgr.accessQueue(inputQName, openOptions);
         MQTest77.logger("successfully opened "+ inputQName);

         requestMsg = new MQMessage();
         requestMsg.messageId = CMQC.MQMI_NONE;
         requestMsg.correlationId = CMQC.MQCI_NONE;

         // get the message on the queue
         queue.get(requestMsg, gmo);
      }
      catch (MQException e)
      {
         MQTest77.logger("CC=" +e.completionCode + " : RC=" + e.reasonCode);
         
         if ( (e.completionCode == CMQC.MQCC_FAILED) && (e.reasonCode == CMQC.MQRC_NO_MSG_AVAILABLE) )
         {
            MQTest77.logger("No messages on the queue: " + inputQName);
            requestMsg = null;  // no message on queue
         }
      }
      finally
      {
         try
         {
            if (queue != null)
            {
               queue.close();
               MQTest77.logger("closed input: "+ inputQName);
            }
         }
         catch (MQException e)
         {
            MQTest77.logger("CC=" +e.completionCode + " : RC=" + e.reasonCode);
         }
      }
      
      return requestMsg;
   }

   /**
    * This is the method where you would normally do business logic stuff.
    * This code will we will take the checksum of the request
    * message and use the value as a seed to the random number generator
    * to that will generate numbers for the next PowerBall lottery!!
    * 
    * @param requestMsg
    */
   private ResponseStuff doSomeLogic(MQMessage requestMsg)
   {
      SimpleDateFormat date = new SimpleDateFormat("yyyy/MM/dd");
      ResponseStuff rs = new ResponseStuff();
      byte[] msgInBytes;
      
      try
      {
         rs.setRequestMsgId(requestMsg.messageId);
         
         requestMsg.seek(0);  // Important: put the cursor at the beginning of the message.
         MQRFH2 requestRFH2 = new MQRFH2(requestMsg);

         // Save the RFH2 Values
         rs.setRfh2(extractRFH2Values(requestRFH2));
         
         if (CMQC.MQFMT_STRING.equals(requestRFH2.getFormat()))
         {
            String msgStr = requestMsg.readStringOfByteLength(requestMsg.getDataLength());
            msgInBytes = msgStr.getBytes();
         }
         else
         {
            msgInBytes = new byte[requestMsg.getDataLength()];
            requestMsg.readFully(msgInBytes);
         }
         
         /**
          * Now do something with the request message payload.
          * i.e. process msgStr
          * 
          * But we are going to generate PowerBall lottery numbers.
          * 
          */

         /* Apply checksum to the message payload */
         Checksum checksum = new CRC32();
         checksum.update(msgInBytes, 0, msgInBytes.length);

         // get the current checksum value & use it as a seed value.
         Random random = new Random(checksum.getValue());

         StringBuffer sb = new StringBuffer();
         sb.append("For today, ");
         sb.append(date.format(new Date()));
         sb.append(" your Powerball numbers are: ");
         for (int i=0; i < 4; i++)
            sb.append(random.nextInt(69)+", ");
         sb.append(random.nextInt(69)+" ");
         sb.append("and your power ball is ");
         sb.append(random.nextInt(26));
         
         rs.setResponseText(sb.toString());
      }
      catch (IOException e)
      {
         MQTest77.logger("IOException:" +e.getLocalizedMessage());
      }
      catch (MQDataException e)
      {
         MQTest77.logger("CC=" +e.completionCode + " : RC=" + e.reasonCode);
      }
      
      return rs;
   }

   /**
    * Extract the MQRFH2 values we will need.
    * @param requestRFH2
    * @return rfh2
    */
   private MQRFH2 extractRFH2Values(MQRFH2 requestRFH2)
   {
      MQRFH2 replyRFH2 = null;
      
      try
      {
         // Set the RFH2 Values
         replyRFH2 = new MQRFH2();
         replyRFH2.setEncoding(requestRFH2.getEncoding());
         replyRFH2.setCodedCharSetId(requestRFH2.getCodedCharSetId());
         replyRFH2.setFormat(requestRFH2.getFormat());
         replyRFH2.setFlags(requestRFH2.getFlags());
         replyRFH2.setNameValueCCSID(requestRFH2.getNameValueCCSID());

         String[] folders = requestRFH2.getFolderStrings();
         for (int i=0; i < folders.length; i++)
         {
            if ( (folders[i].startsWith("<jms>")) && (folders[i].contains("<Dst>")) && (folders[i].contains("</Dst>")) )
            {
               int prefix = folders[i].indexOf("<Dst>");
               int suffix = folders[i].indexOf("</Dst>");
               // Set the destination to the output queue name.
               folders[i] = folders[i].substring(0, prefix) + "<Dst>queue:///"+outputQName+"</Dst>" + folders[i].substring(suffix+6);
               break;
            }
         }
         replyRFH2.setFolderStrings(folders);
      }
      catch (IOException e)
      {
         MQTest77.logger("IOException:" +e.getLocalizedMessage());
      }
      
      return replyRFH2;
   }


   /**
    * Open output queue, write a message, close queue.
    * @param qMgr
    * @param requestMsg
    */
   private void sendReplyMsg(MQQueueManager qMgr, ResponseStuff rs)
   {
      MQQueue queue = null;
      int openOptions = CMQC.MQOO_OUTPUT + CMQC.MQOO_FAIL_IF_QUIESCING;
      MQPutMessageOptions pmo = new MQPutMessageOptions();

      try
      {
         queue = qMgr.accessQueue(outputQName, openOptions);
         MQTest77.logger("successfully opened "+ outputQName);

         MQMessage replymsg = new MQMessage();

         // Set the RFH2 Values
         MQRFH2 replyRFH2 = new MQRFH2();
         replyRFH2.setEncoding(rs.getRfh2().getEncoding());
         replyRFH2.setCodedCharSetId(rs.getRfh2().getCodedCharSetId());
         replyRFH2.setFormat(rs.getRfh2().getFormat());
         replyRFH2.setFlags(rs.getRfh2().getFlags());
         replyRFH2.setNameValueCCSID(rs.getRfh2().getNameValueCCSID());
         replyRFH2.setFolderStrings(rs.getRfh2().getFolderStrings());
         
         // Set the MQRFH2 structure to the message
         replyRFH2.write(replymsg);

         // Write message data
         replymsg.writeString(rs.getResponseText());

         // Set MQMD values
         replymsg.messageId = CMQC.MQMI_NONE;
         /* set the reply's Correl Id to be the request message's Msg id */  
         replymsg.correlationId = rs.getRequestMsgId();
         replymsg.messageType = CMQC.MQMT_DATAGRAM;
         
         // IMPORTANT: Set the format to MQRFH2 aka JMS Message.
         replymsg.format = CMQC.MQFMT_RF_HEADER_2;

         // put the message on the queue
         queue.put(replymsg, pmo);
         MQTest77.logger("Message Data>>>" + rs.getResponseText());
      }
      catch (MQException e)
      {
         MQTest77.logger("CC=" +e.completionCode + " : RC=" + e.reasonCode);
      }
      catch (IOException e)
      {
         MQTest77.logger("IOException:" +e.getLocalizedMessage());
      }
      finally
      {
         try
         {
            if (queue != null)
            {
               queue.close();
               MQTest77.logger("closed output: "+ outputQName);
            }
         }
         catch (MQException e)
         {
            MQTest77.logger("CC=" +e.completionCode + " : RC=" + e.reasonCode);
         }
      }
   }

   /**
    * A simple logger method
    * @param data
    */
   public static void logger(String data)
   {
      String className = Thread.currentThread().getStackTrace()[2].getClassName();

      // Remove the package info.
      if ( (className != null) && (className.lastIndexOf('.') != -1) )
         className = className.substring(className.lastIndexOf('.')+1);

      System.out.println(LOGGER_TIMESTAMP.format(new Date())+" "+className+": "+Thread.currentThread().getStackTrace()[2].getMethodName()+": "+data);
   }

   /**
    * main line
    * @param args
    */
   public static void main(String[] args)
   {
      MQTest77 write = new MQTest77();

      try
      {
         write.init(args);
         write.doIt();
      }
      catch (IllegalArgumentException e)
      {
         MQTest77.logger("Usage: java MQTest77 -m QueueManagerName -h host -p port -c channel -q InputQueueName -o OutputQueueName-u UserID -x Password");
         System.exit(1);
      }

      System.exit(0);
   }
   
   /**
    * This class will hold the information to be used in the reply message.
    */
   class ResponseStuff
   {
      private MQRFH2 rfh2;
      private String responseText;
      private byte[] requestMsgId;

      /**
       * The constructor
       */
      public ResponseStuff()
      {
         super();
         rfh2 = new MQRFH2();
      }

      public MQRFH2 getRfh2()
      {
         return rfh2;
      }

      public void setRfh2(MQRFH2 rfh2)
      {
         this.rfh2 = rfh2;
      }

      public String getResponseText()
      {
         return responseText;
      }

      public void setResponseText(String responseText)
      {
         this.responseText = responseText;
      }

      public byte[] getRequestMsgId()
      {
         return requestMsgId;
      }

      public void setRequestMsgId(byte[] requestMsgId)
      {
         this.requestMsgId = requestMsgId;
      }

   }
}

Regards,
Roger Lacroix
Capitalware Inc.

This entry was posted in HPE NonStop, IBM i (OS/400), IBM MQ, IBM MQ Appliance, Java, JMS, Linux, macOS (Mac OS X), Open Source, Programming, Raspberry Pi, Unix, Windows, z/OS.

Comments are closed.