Java MQ Code to Retrieve Messages from a Queue Based on a Filter

There is a misconception regarding MQ/JMS message based filtering. Some people think that the IBM MQ queue manager does something special for JMS applications that is not done for plain Java or C/C++/C#/COBOL applications. It is just NOT true. Note: For C/C++/C#/COBOL applications, they can use the SelectionString from MQOD structure to perform message selection.

Sometimes when you quickly read a paragraph, you may miss the subtle nuances of the paragraph. See the first paragraph of the Message selectors in JMS in the Knowledge Center. It says:

Messages can contain application-defined property values. An application can use message selectors to have a JMS provider filter messages.

It doesn’t say the queue manager will filter the messages but rather, it says “JMS provider”. The JMS provider, aka the JMS client library, will perform the filtering of the messages.

So, I decided to create a simple POJO (Plain Old Java Object) class which can be used as a simple message selector with your POJO MQ application. There is nothing complicated about the code. It browses the messages on the queue, checks the message property for a matching value. If found then the message is removed from the queue and returned to the application, otherwise it continues to the next message. This is done until all messages have been checked (RC of 2033 – MQRC_NO_MSG_AVAILABLE).

I could have just as easily created it in C but I decided to do it in Java because I have seen people want a message selector in POJO.

You can download the source code for the test driver program called MQTest12MS from here which includes the MessageSelector class. If you just want the MessageSelector class, you can download from here.

Here is a snippet of the MQTest12MS that shows you how to use the MessageSelector class.

ms = new MessageSelector(qMgr);
ms.openQueue(inputQName);
ms.setFilter("SomeNum", MessageSelector.Conditions.GREATER_THAN_EQUAL, 123);

while (true)
{
   receiveMsg = ms.getMessage(startAtBeginning);

   // got the message, now go and do something with it.

   // set flag to continue rather than restart at the beginning.
   startAtBeginning = false;
}
ms.closeQueue();

Here is the MessageSelector class.

import java.io.IOException;

import com.ibm.mq.MQException;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQQueue;
import com.ibm.mq.MQQueueManager;
import com.ibm.mq.constants.CMQC;

/**
 * Class Name
 *  MessageSelector
 *
 * Description
 *  This java class will retrieve messages from a queue based on a filter.
 *
 * @author Roger Lacroix
 * @version 1.0.0
 * @license Apache 2 License
 */
public class MessageSelector
{
   public enum Conditions
   {
      EQUAL, NOT_EQUAL, LESS_THAN, LESS_THAN_EQUAL, GREATER_THAN, GREATER_THAN_EQUAL;
   }
   
   private MQQueueManager  qMgr = null;
   private MQQueue         inQ = null;
   private String          filterName = null;
   private Conditions      filterCondition;
   private Object          filterValue = null;

   /**
    * The constructor
    * @param qMgr - must have a valid/active connection to the queue manager 
    */
   public MessageSelector(MQQueueManager qMgr)
   {
      super();
      this.qMgr = qMgr;
   }
   
   /**
    * Open the queue for both browsing and destructive gets.
    * @param qName
    * @throws MQException
    */
   public void openQueue(String qName) throws MQException
   {
      inQ = qMgr.accessQueue(qName, CMQC.MQOO_INQUIRE + CMQC.MQOO_BROWSE + CMQC.MQOO_FAIL_IF_QUIESCING + CMQC.MQOO_INPUT_SHARED);
   }
   
   /**
    * Close the queue.
    * @throws MQException
    */
   public void closeQueue() throws MQException 
   {
      if (inQ != null)
         inQ.close();
   }
   
   /**
    * Set the filter name, condition and value. 
    * @param name
    * @param condition
    * @param value
    * @throws IllegalArgumentException
    */
   public void setFilter(String name, Conditions condition, Object value) throws IllegalArgumentException
   {
      if (name == null)
         throw new IllegalArgumentException("Filter name cannot be null.");
      else if ("".equals(name))
         throw new IllegalArgumentException("Filter name cannot be blank.");
      else if (value == null)
         throw new IllegalArgumentException("Filter value cannot be null.");
      
      if ( (value instanceof String) || (value instanceof Boolean) ||
           (value instanceof Byte)   || (value instanceof Byte[])  )
       {
          if ( (Conditions.EQUAL != condition) && (Conditions.NOT_EQUAL != condition) )
          {
             throw new IllegalArgumentException("Filter condition can only be EQUAL or NOT_EQUAL.");
          }
       }
       else if ( (value instanceof Integer) || (value instanceof Long) ||
                 (value instanceof Double)  || (value instanceof Float) )
       {
          if ( (Conditions.EQUAL != condition) && (Conditions.NOT_EQUAL != condition) &&
               (Conditions.LESS_THAN != condition) && (Conditions.LESS_THAN_EQUAL != condition) &&
               (Conditions.GREATER_THAN != condition) && (Conditions.GREATER_THAN_EQUAL != condition) )
          {
             throw new IllegalArgumentException("Filter condition must be one of the following: EQUAL, NOT_EQUAL, LESS_THAN, LESS_THAN_EQUAL, GREATER_THAN, GREATER_THAN_EQUAL.");
          }
       }
       else
       {
          throw new IllegalArgumentException("Unknown Object type for Filter value.");
       }

      /**
       * Pass the checks, save the values
       */
      this.filterName = name;
      this.filterCondition = condition;
      this.filterValue = value;
   }

   /**
    * Retrieve the next matching message from the queue.
    * @param reset - Start over from the beginning of the queue.
    * @return
    * @throws MQException
    * @throws IOException
    */
   public MQMessage getMessage(boolean reset) throws MQException, IOException
   {
      MQGetMessageOptions gmo = new MQGetMessageOptions();
      if (reset)
         gmo.options = CMQC.MQGMO_BROWSE_FIRST + CMQC.MQGMO_NO_WAIT + CMQC.MQGMO_FAIL_IF_QUIESCING;
      else
         gmo.options = CMQC.MQGMO_BROWSE_NEXT + CMQC.MQGMO_NO_WAIT + CMQC.MQGMO_FAIL_IF_QUIESCING;
      MQMessage getMsg = null;

      while (true)
      {
         getMsg = new MQMessage();

         inQ.get(getMsg, gmo);

         if (performConditionalTest(getMsg))
         {
            deleteMessage();
            break;
         }

         gmo.options = CMQC.MQGMO_BROWSE_NEXT + CMQC.MQGMO_NO_WAIT + CMQC.MQGMO_FAIL_IF_QUIESCING;
      }
      
      return getMsg;
   }
   
   /**
    * Handle the conditional testing of the value.
    * @param getMsg
    * @return true/false
    */
   private boolean performConditionalTest(MQMessage getMsg)
   {
      boolean flag = false;

      try
      {
         if (filterValue instanceof String)
         {
            String value = getMsg.getStringProperty(filterName);
            if (value != null) 
            {
               if ( (Conditions.EQUAL == filterCondition) && (((String)filterValue).equals(value)) )
                  flag = true;
               else if ( (Conditions.NOT_EQUAL == filterCondition) && (!(((String)filterValue).equals(value))) )
                  flag = true;
            }
         }
         else if (filterValue instanceof Integer)
         {
            int value = getMsg.getIntProperty(filterName);
            
            if ( (Conditions.EQUAL == filterCondition) && (value == (Integer)filterValue) ) 
               flag = true;
            else if ( (Conditions.NOT_EQUAL == filterCondition) && (value != (Integer)filterValue) ) 
               flag = true;
            else if ( (Conditions.LESS_THAN == filterCondition) && (value < (Integer)filterValue) ) 
               flag = true;
            else if ( (Conditions.LESS_THAN_EQUAL == filterCondition) && (value <= (Integer)filterValue) ) 
               flag = true;
            else if ( (Conditions.GREATER_THAN == filterCondition) && (value > (Integer)filterValue) ) 
               flag = true;
            else if ( (Conditions.GREATER_THAN_EQUAL == filterCondition) && (value >= (Integer)filterValue) ) 
               flag = true;
         }
         else if (filterValue instanceof Long)
         {
            long value = getMsg.getLongProperty(filterName);
            
            if ( (Conditions.EQUAL == filterCondition) && (value == (Long)filterValue) ) 
               flag = true;
            else if ( (Conditions.NOT_EQUAL == filterCondition) && (value != (Long)filterValue) ) 
               flag = true;
            else if ( (Conditions.LESS_THAN == filterCondition) && (value < (Long)filterValue) ) 
               flag = true;
            else if ( (Conditions.LESS_THAN_EQUAL == filterCondition) && (value <= (Long)filterValue) ) 
               flag = true;
            else if ( (Conditions.GREATER_THAN == filterCondition) && (value > (Long)filterValue) ) 
               flag = true;
            else if ( (Conditions.GREATER_THAN_EQUAL == filterCondition) && (value >= (Long)filterValue) ) 
               flag = true;
         }
         else if (filterValue instanceof Double)
         {
            double value = getMsg.getDoubleProperty(filterName);
            
            if ( (Conditions.EQUAL == filterCondition) && (value == (Double)filterValue) ) 
               flag = true;
            else if ( (Conditions.NOT_EQUAL == filterCondition) && (value != (Double)filterValue) ) 
               flag = true;
            else if ( (Conditions.LESS_THAN == filterCondition) && (value < (Double)filterValue) ) 
               flag = true;
            else if ( (Conditions.LESS_THAN_EQUAL == filterCondition) && (value <= (Double)filterValue) ) 
               flag = true;
            else if ( (Conditions.GREATER_THAN == filterCondition) && (value > (Double)filterValue) ) 
               flag = true;
            else if ( (Conditions.GREATER_THAN_EQUAL == filterCondition) && (value >= (Double)filterValue) ) 
               flag = true;
         }
         else if (filterValue instanceof Float)
         {
            float value = getMsg.getFloatProperty(filterName);

            if ( (Conditions.EQUAL == filterCondition) && (value == (Float)filterValue) ) 
               flag = true;
            else if ( (Conditions.NOT_EQUAL == filterCondition) && (value != (Float)filterValue) ) 
               flag = true;
            else if ( (Conditions.LESS_THAN == filterCondition) && (value < (Float)filterValue) ) 
               flag = true;
            else if ( (Conditions.LESS_THAN_EQUAL == filterCondition) && (value <= (Float)filterValue) ) 
               flag = true;
            else if ( (Conditions.GREATER_THAN == filterCondition) && (value > (Float)filterValue) ) 
               flag = true;
            else if ( (Conditions.GREATER_THAN_EQUAL == filterCondition) && (value >= (Float)filterValue) ) 
               flag = true;
         }
         else if (filterValue instanceof Boolean)
         {
            Boolean value = getMsg.getBooleanProperty(filterName);
            if ( (value != null) && ((Boolean)filterValue == value) ) 
               flag = true;
         }
         else if (filterValue instanceof Byte)
         {
            byte value = getMsg.getByteProperty(filterName);
            if ((Byte)filterValue == value)  
               flag = true;
         }
         else if (filterValue instanceof Byte[])
         {
            byte[] value = getMsg.getBytesProperty(filterName);
            if ( (value != null) && (java.util.Arrays.equals((byte[])filterValue, value)) ) 
               flag = true;
         }
      }
      catch (Exception e)
      {}

      return flag;
   }
   
   /**
    * Delete the message that the cursor is pointing to. 
    */
   private void deleteMessage()
   {
      MQMessage deleteMsg = new MQMessage();
      MQGetMessageOptions gmo = new MQGetMessageOptions();
      gmo.options = CMQC.MQGMO_MSG_UNDER_CURSOR + CMQC.MQGMO_NO_WAIT + CMQC.MQGMO_FAIL_IF_QUIESCING + CMQC.MQGMO_ACCEPT_TRUNCATED_MSG;

      /** 
       * don't need it - because we already have the message
       * just delete it.
       */
      try
      {
         inQ.get(deleteMsg, gmo, 1);  // only get 1 byte - who cares right!!  
      }
      catch (MQException e)
      {}
   }
}

Regards,
Roger Lacroix
Capitalware Inc.

This entry was posted in IBM i (OS/400), IBM MQ, Java, Linux, macOS (Mac OS X), Programming, Unix, Windows.

5 Responses to Java MQ Code to Retrieve Messages from a Queue Based on a Filter