Oracle AQ: working with PL/SQL asynchronous notifications

I like Oracle Streams Advanced Queuing (AQ), it’s reliable and fast. I’ve been working with this technology for the last four years, with 10g and 11g database versions; most of the time, I’ve had to interact with Java EE systems, through Java Message Service (JMS), which it’s fully supported by Oracle AQ. JMS has Message Driven Beans (MDB) as the standard way to consume messages in Java EE, as its counter part on the Oracle database you can register asynchronous notifications to PL/SQL procedures. To be perfectly honest, I’ve always considered that the configuration of this functionality is a bit tricky, because sometimes you don’t get an error, it simply doesn’t work. That’s why I’m writing this post, to present a simple example of PLSQL asynchronous notifications that you can download from github.

The first point that I’d like to deal is the sign of the callback procedure, standalone or belonging to a package, that will consume the messages:

procedure receive_message_callback (
   context  raw,
   reginfo  sys.aq$_reg_info,
   descr    sys.aq$_descriptor,
   payload  raw,
   payloadl number
);

I think that the key here is the type of the argument payload: raw or varchar2, depending on the type of the message. I’ve prepared my sample with my personal Oracle Database Express Edition 11g Release 2, where I couldn’t work with Oracle AQ JMS types, so I’ve used a custom data type as the payload of the messages, which implies that the payload argument type has to be raw, but if you work, for example, with JMS Text Messages, SYS.AQ$_JMS_TEXT_MESSAGE in Oracle AQ, the payload argument type has to be varchar2.

The second issue is that the configuration needed varies depending if the destination is a queue or a topic.
In a queue each message is consumed by just one consumer, so you simply has to register the callback procedure. Here you have an excerpt of my code:

begin
   dbms_aqadm.create_queue_table (queue_table => 'queues_qt',
                                  queue_payload_type => 'TESTAQ.MESSAGES_T');

   dbms_aqadm.create_queue (queue_name => 'test_queue',
                            queue_table => 'queues_qt');

   dbms_aqadm.start_queue (queue_name => 'test_queue');

   dbms_aq.register(
      sys.aq$_reg_info_list(
         sys.aq$_reg_info(
            'TESTAQ.TEST_QUEUE',
            dbms_aq.namespace_aq,
            'plsql://TESTAQ.TEST_P.RECEIVE_MESSAGE_CALLBACK',
            hextoraw('FF')
         )
      ),
      1
   );
end;
/

The tip to remember here is not to forget the schema name prefix before queue and callback procedure names.

In a topic each message can be consumed by one or several subscriptors and each of them can process the message in a different way, for example sending an email, instead of processing by a PL/SQL procedure. So, you first have to register a subscriber, an agent in Oracle AQ terminology, and then register the PLSQL consumer. Here you have an excerpt of my code:

begin
   -- It's a topic, so multiple_consumers parameter is specified.
   dbms_aqadm.create_queue_table (queue_table => 'topics_qt',
                                  queue_payload_type => 'TESTAQ.MESSAGES_T',
                                  multiple_consumers => true);

   dbms_aqadm.create_queue (queue_name => 'test_topic',
                            queue_table => 'topics_qt');

   dbms_aqadm.start_queue (queue_name => 'test_topic');

   dbms_aqadm.add_subscriber (queue_name => 'test_topic',
                              subscriber => sys.aq$_agent(
                                               name => 'demo_subscriber',
                                               address => null,
                                               protocol => 0));

   dbms_aq.register(
      sys.aq$_reg_info_list(
         sys.aq$_reg_info(
            'TESTAQ.TEST_TOPIC:DEMO_SUBSCRIBER',
            dbms_aq.namespace_aq,
            'plsql://TESTAQ.TEST_P.RECEIVE_MESSAGE_CALLBACK',
            hextoraw('FF')
         )
     ),
     1
   );
end;
/

The tip to remember here is not to forget to put the name of the subscriber after the name of the topic, when you’re registering the callback procedure.


References



Weblogic: receiving messages from a remote HornetQ destination

I’ve talked in former posts about setting up a JMS bridge between Weblogic and HornetQ and how to secure that connection, but the bridge is just appropriate for sending messages from Weblogic to HornetQ, the proper way for receiving data send to HornetQ queues in Weblogic is to implement a Message Driven Bean (MDB), as you can read on the documentation of Weblogic. How to connect the MDB deployed in Weblogic to a HornetQ destination? After a research, I think that the best answer is a Foreign Server.

First of all, I copied the following HornetQ client libraries to a folder residing in a filesystem of my Weblogic server:

  • hornetq-core-client.jar
  • hornetq-jms-client.jar
  • netty.jar
  • jnp-client.jar
  • jboss-logging.jar

Later on, I set the PRE_CLASSPATH variable pointing it to these libraries into the script setDomainEnv and started Weblogic. The next step was to create a new JMS System Module on the console, accepting the defaults.  After that, I edited the module and setup a new Foreign Server with the defaults and I entered its configuration: Weblogic Foreign Server Main Configuration

The following step was to link the destinations (topics and queues) and the connection factories I previously setup on HornetQ. To do this I used the wizards provided by the Weblogic console, it’s easy, I just have to point out that I had to edit the configuration of my connection factory to add the security credentials:

Details of a Foreign Connection Factory

Finally, I mapped the queue and the connection factory in my MDB, here you have an example:

@MessageDriven(mappedName = "jms/Test/TestQueue")
@MessageDestinationConfiguration(
         connectionFactoryJNDIName = "jms/Test/TestXAConnectionFactory")
public class TestMDB implements MessageListener {
    static final Logger logger = LoggingHelper.getServerLogger();

    public void onMessage(Message message) {
        TextMessage msg = null;

        try {
            if (message instanceof TextMessage) {
                msg = (TextMessage) message;
                logger.info("MESSAGE BEAN: Message received: " + msg.getText());
            } else {
                logger.warning("Message of wrong type: " + 
                                    message.getClass().getName());
            }
        } catch (JMSException e) {
            logger.severe("TestMDB.onMessage: JMSException: " + e.toString());
            e.printStackTrace();
        } catch (Throwable te) {
            logger.severe("TestMDB.onMessage: Exception: " + te.toString());
            te.printStackTrace();
        }
    }
}

Securing a JMS bridge between Weblogic and HornetQ

I posted my notes about setting up a JMS bridge between Weblogic and HornetQ last September, now I’d like to talk about how to secure the connection: implementing an user authentication & authorization process and ensuring the communication between the servers with SSL. Finally, I’ll write about accessing to the secured queue from a Message Driven Bean (MDB) deployed on JBoss 6.1.  Most of the configuration is made on HornetQ, therefore I’ll make references to its User Manual.

The first step was to set up a security constraint, in order to ensure that just the user assigned to Weblogic 10.3 for the bridge could access to the queue of HornetQ. I had previously created a security role (testRole) and a user (testUser) belonging to it on JBoss, the procedure to do this may be different, depending on the security manager you’re using, and it’s out of the scope of this post. I used the defaults for testing purposes and just had to write a line in the files %JBOSS_HOME%\server\default\conf\props\hornetq-users.properties and %JBOSS_HOME%\server\default\conf\props\hornetq-roles.properties  Once I configured the user and the role, I edited the file %JBOSS_HOME%\ server\default\deploy\hornetq\hornetq-configuration.xml and added the following security setting:

<security-setting match="jms.queue.TestQueue">
  <permission type="consume" roles="testRole"/>
  <permission type="send" roles="testRole"/>
</security-setting>

Please, read the security chapter of the HornetQ User Manual to know about the different permissions that can be defined, addressing and wildcards.

The next step was to modify the Weblogic bridge destination on the console, in order to set up the user assigned by JBoss, here you have the excerpt from the resulting config.xml file:

<jms-bridge-destination>
  <name>JMS Bridge Destination-Target</name>
  <adapter-jndi-name>eis.jms.WLSConnectionFactoryJNDIXA</adapter-jndi-name>
  <user-name>testUser</user-name>
  <user-password-encrypted>{AES}GZTa15osrQW6/5rbE2fzm+a9BX/YPY7Hm3xiIFi0oMQ=</user-password-encrypted>
  <classpath></classpath>
  <connection-factory-jndi-name>jms/TestXAQueueConnectionFactory</connection-factory-jndi-name>
  <initial-context-factory>org.jnp.interfaces.NamingContextFactory</initial-context-factory>
  <connection-url>jnp://srvhornetq.test.local:1099</connection-url>
  <destination-jndi-name>jms/TestQueue</destination-jndi-name>
  <destination-type>Queue</destination-type>
</jms-bridge-destination>

The following issue was to secure the traffic between Weblogic and the JBoss server, where HornetQ was running as a JMS provider. To accomplish that goal, it was very useful to read the chapter about transport of the HornetQ User Manual. A previous task at this point was to prepare a self-signed certificate using keytool, the sentence I executed to create the client keystore was:

keytool -genkeypair -alias test -keyalg RSA -keysize 1024 -dname OU=TEST,CN=TEST,CN=LOCAL
        -keypass test1234 -keystore hornetq.test.keystore -storepass test1234

In order to create the trust keystore, I simply imported the client keystore into a new one:

keytool -importkeystore -srckeystore hornetq.test.keystore -destkeystore
         hornetq.test.truststore -srcstorepass test1234 -deststorepass test1234

I put these two keystores into the %JBOSS_HOME%\server\default\data\hornetq directory, so the acceptor that listens for SSL connections, could access them. I also copied the hornetq.test.keystore to the file system of Weblogic, in order to be accessed by the connector. Here you have an excerpt from the hornetq-configuration.xml file:

<connectors>
...
  <connector name="netty-ssl-connector">
    <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
    <param key="host" value="${jboss.bind.address:localhost}"/>
    <param key="port" value="${hornetq.remoting.netty.port.ssl:5446}"/>
    <param key="ssl-enabled" value="true"/>
    <param key="key-store-path" value="/oracle/hornetq/hornetq/hornetq.test.keystore"/>
    <param key="key-store-password" value="test1234"/>
  </connector>
...
</connectors>

<acceptors>
...
  <acceptor name="netty-ssl-acceptor">
    <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
    <param key="host" value="${jboss.bind.address:localhost}"/>
    <param key="port" value="${hornetq.remoting.netty.port.ssl:5446}"/>
    <param key="ssl-enabled" value="true"/>
    <param key="key-store-path" value="${jboss.server.data.dir}/hornetq/hornetq.test.keystore"/>
    <param key="key-store-password" value="test1234"/>
    <param key="trust-store-path" value="${jboss.server.data.dir}/hornetq/hornetq.test.truststore"/>
    <param key="trust-store-password" value="test1234"/>
  </acceptor>
...
</acceptors>

Finally, I deployed a MDB on JBoss that listened for the messages send from an application running on Weblogic:

@MessageDriven(name = "TestMDB", activationConfig = {
  @ActivationConfigProperty(propertyName = "acknowledgeMode",
                            propertyValue = "Auto-acknowledge"),
  @ActivationConfigProperty(propertyName = "destinationType",
                            propertyValue = "javax.jms.Queue"),
  @ActivationConfigProperty(propertyName = "destination",
                            propertyValue = "jms/TestQueue")
})
public class TestMDB implements MessageListener {
  static final Logger logger = Logger.getLogger("TestMDB");

  public void onMessage(Message message) {
    TextMessage msg = null;

    try {
      if (message instanceof TextMessage) {
        msg = (TextMessage) message;
        logger.info("MESSAGE BEAN: Message received: " + msg.getText());
      } else {
        logger.warning("Message of wrong type: " + 
                               message.getClass().getName());
      }
    } catch (JMSException e) {
      logger.severe("TestMDB.onMessage: JMSException: " + e.toString());
      e.printStackTrace();
    } catch (Throwable te) {
      logger.severe("TestMDB.onMessage: Exception: " + te.toString());
      te.printStackTrace();
    }
  }
}

But, how to access to a secured queue? The answer was to configure the following piece with the user and password into the jboss.xml file:

<enterprise-beans>
  <message-driven>
  <ejb-name>TestMDB</ejb-name>
  <mdb-user>testUser</mdb-user>
  <mdb-passwd>test1234</mdb-passwd>
  </message-driven>
</enterprise-beans>

References



Message Driven Beans & Oracle AQ

I’ve followed the chapter “Interoperating with Oracle AQ JMS” in order to integrate Oracle Streams Advanced Queuing (AQ) with Weblogic. All is working properly now, but I’ve had some problems programming a Message Driven Bean (MDB) which consumes messages enqueued by a PL/SQL procedure, so I would like to specify the minimum configuration properties you need to deploy this type of bean, apart from the queue name, of course.

The first one is the destination type for the MDB destination, which  should be configured to either: javax.jms.Queue or javax.jms.Topic, and the second one is the connection factory JNDI name that you’ve configured on Weblogic. Here you have a basic example:

@MessageDriven(mappedName = "aqjms/TestTopicQ",
               activationConfig = {
   @ActivationConfigProperty(propertyName = "destinationType", 
                             propertyValue = "javax.jms.Topic")})
@MessageDestinationConfiguration(connectionFactoryJNDIName = "aqjms/TestTopicCF")
public class TestMDB implements MessageListener {
    static final Logger logger = LoggingHelper.getServerLogger();

    public void onMessage(Message message) {
        TextMessage msg = null;

        try {
            if (message instanceof TextMessage) {
                msg = (TextMessage) message;
                logger.info("MESSAGE BEAN: Message received: " + msg.getText());
            } else {
                logger.warning("Message of wrong type: " + 
                                    message.getClass().getName());
            }
        } catch (JMSException e) {
            logger.severe("TestMDB.onMessage: JMSException: " + e.toString());
            e.printStackTrace();
        } catch (Throwable te) {
            logger.severe("TestMDB.onMessage: Exception: " + te.toString());
            te.printStackTrace();
        }
    }
}

 


References