Routing Oracle AQ messages using Apache Camel in ServiceMix: the XA option

I wrote in my last post about how to route messages between software based on Oracle Advanced Queuing (AQ) and Apache ActiveMQ, using Camel over ServiceMix. Today, I’d like to write about a different option than the ones I talked before: the XA option.

There are situations where, in case of any exception, you have to guarantee that in any way any message is lost and that messages are not received duplicated, these situations are where the X/Open XA standard comes into action.

I started my research on this issue by looking for documentation and examples. Two elements where very useful at this point: an Oracle White Paper about XA and Oracle controlled Distributed Transactions, that allowed me to review the XA topic, and a very good sample of a similar situation that shows how to use XA transactions with Camel across ActiveMQ and WebSphere MQ written by Torsten Mielke. You can find the result code of my XA test on my GitHub repository.

The test has a processor, copied from Torsten’s example, that simulates an exception and allows you to see how the exchange is recovered. But, the interesting issue here is trying to recover after an Oracle database crash or a network failure in the middle of a message exchange. I’ve made many test, for the first ones I used an Oracle 11.2 test database over a Microsoft Windows operative system and Apache ServiceMix 6.0, also over Windows. In this environment, I submitted shutdown abort and shutdown immediate commands to the database and the Geronimo recovery manager didn’t recovered the Oracle branches in prepared state, so I had to commit force the in-doubt transactions.

I wasn’t happy with the previous solution, which it is not recommended by Oracle (read the previously linked white paper) and it isn’t transparent for the user. My final environment wasn’t Windows bases, but Linux, so I ran other tests using another Oracle 11.2 database installed over Linux Red-Hat and Apache ServiceMix 6.0, also over Linux. In this case, the Geronimo recovery manager was capable of recovering prepared transactions and I couldn’t reproduce the error. An Oracle database configuration problem? an operative system issue? That’s I suppose.

Advertisements

Routing Oracle AQ messages using Apache Camel in ServiceMix

I’ve been working with Apache ServiceMix lately and I expect to carry on doing in next months. One of the issues I’ve faced is how to route messages between software based on Oracle Advanced Queuing (AQ) and other systems, using one of the main components of Apache ServiceMix: Camel. In this post, I’d like to start talking about the approach I’ve selected. I’m still working with XA, so this is just the beginning! The code of my proof of concept is on github.

The first topic I had to study was how to deploy the libraries needed to work with Oracle AQ: aqapi.jar and ojdbc6.jar I started embedding the jar files on the bundle, but this approach implies that the libraries have to be attached to each bundle, so I finally decided to create a feature that wraps the files and converts them to OSGI bundles. This is the first module of my code repository, called feature.

Then, I started to work in a basic connection, using the Camel JMS component, by injecting an Oracle AQ connection factory bean. You have my code on the second module of the project, called basic-test. But this solution is not scalable, because the component creates a JMS connection, which implies a JDBC one, each time a message is sent and this is an expensive process, so I investigate the use of connection pools.

The question I had to study at this point was if I just had to create a JDBC connection pool or a JMS one, I selected the second option because a JMS connection not only implies a JDBC connection, as I said before, but also other stuff of JMS api. A very, very basic benchmark showed me that my test environment (Apache Service Mix 6.0.0, Oracle 11.2.0 database on virtual machines) took 3 seconds in sending 100 messages on the performance test, versus 14 seconds on the basic one.

On the other hand, receiving messages test results weren’t so impressive, I tested competing consumers and asynchronous parameters (which cannot be uses in all use cases). My configuration is in the module performance-test of the github code repository, where you can play with the configuration parameters of the bundle and get your own conclusions.


Oracle AQ: working with PL/SQL asynchronous notifications

I like Oracle Streams Advanced Queuing (AQ), it’s reliable and fast. I’ve been working with this technology for the last four years, with 10g and 11g database versions; most of the time, I’ve had to interact with Java EE systems, through Java Message Service (JMS), which it’s fully supported by Oracle AQ. JMS has Message Driven Beans (MDB) as the standard way to consume messages in Java EE, as its counter part on the Oracle database you can register asynchronous notifications to PL/SQL procedures. To be perfectly honest, I’ve always considered that the configuration of this functionality is a bit tricky, because sometimes you don’t get an error, it simply doesn’t work. That’s why I’m writing this post, to present a simple example of PLSQL asynchronous notifications that you can download from github.

The first point that I’d like to deal is the sign of the callback procedure, standalone or belonging to a package, that will consume the messages:

procedure receive_message_callback (
   context  raw,
   reginfo  sys.aq$_reg_info,
   descr    sys.aq$_descriptor,
   payload  raw,
   payloadl number
);

I think that the key here is the type of the argument payload: raw or varchar2, depending on the type of the message. I’ve prepared my sample with my personal Oracle Database Express Edition 11g Release 2, where I couldn’t work with Oracle AQ JMS types, so I’ve used a custom data type as the payload of the messages, which implies that the payload argument type has to be raw, but if you work, for example, with JMS Text Messages, SYS.AQ$_JMS_TEXT_MESSAGE in Oracle AQ, the payload argument type has to be varchar2.

The second issue is that the configuration needed varies depending if the destination is a queue or a topic.
In a queue each message is consumed by just one consumer, so you simply has to register the callback procedure. Here you have an excerpt of my code:

begin
   dbms_aqadm.create_queue_table (queue_table => 'queues_qt',
                                  queue_payload_type => 'TESTAQ.MESSAGES_T');

   dbms_aqadm.create_queue (queue_name => 'test_queue',
                            queue_table => 'queues_qt');

   dbms_aqadm.start_queue (queue_name => 'test_queue');

   dbms_aq.register(
      sys.aq$_reg_info_list(
         sys.aq$_reg_info(
            'TESTAQ.TEST_QUEUE',
            dbms_aq.namespace_aq,
            'plsql://TESTAQ.TEST_P.RECEIVE_MESSAGE_CALLBACK',
            hextoraw('FF')
         )
      ),
      1
   );
end;
/

The tip to remember here is not to forget the schema name prefix before queue and callback procedure names.

In a topic each message can be consumed by one or several subscriptors and each of them can process the message in a different way, for example sending an email, instead of processing by a PL/SQL procedure. So, you first have to register a subscriber, an agent in Oracle AQ terminology, and then register the PLSQL consumer. Here you have an excerpt of my code:

begin
   -- It's a topic, so multiple_consumers parameter is specified.
   dbms_aqadm.create_queue_table (queue_table => 'topics_qt',
                                  queue_payload_type => 'TESTAQ.MESSAGES_T',
                                  multiple_consumers => true);

   dbms_aqadm.create_queue (queue_name => 'test_topic',
                            queue_table => 'topics_qt');

   dbms_aqadm.start_queue (queue_name => 'test_topic');

   dbms_aqadm.add_subscriber (queue_name => 'test_topic',
                              subscriber => sys.aq$_agent(
                                               name => 'demo_subscriber',
                                               address => null,
                                               protocol => 0));

   dbms_aq.register(
      sys.aq$_reg_info_list(
         sys.aq$_reg_info(
            'TESTAQ.TEST_TOPIC:DEMO_SUBSCRIBER',
            dbms_aq.namespace_aq,
            'plsql://TESTAQ.TEST_P.RECEIVE_MESSAGE_CALLBACK',
            hextoraw('FF')
         )
     ),
     1
   );
end;
/

The tip to remember here is not to forget to put the name of the subscriber after the name of the topic, when you’re registering the callback procedure.


References



Weblogic: Interoperating with Oracle AQ JMS. A tip for sending messages

I usually work with Oracle Streams Advanced Queuing (AQ) integrated with Weblogic. In PL/SQL programs you can set the exception queue where you want to move the messages that couldn’t be delivered to their destinations, when you send a message, for example:


declare
  enq_msgid raw(16) ;
  eopt      dbms_aq.enqueue_options_t;
  mprop     dbms_aq.message_properties_t;
  message   sys.aq$_jms_text_message;
begin
  mprop.priority        := 1;
  mprop.exception_queue := 'test_exception_q';
  message               := sys.aq$_jms_text_message.construct() ;
  message.set_text('This is a Test') ;
  dbms_aq.enqueue(queue_name => 'test_q', 
                  enqueue_options => eopt,
                  message_properties => mprop, 
                  payload => message, 
                  msgid => enq_msgid) ;

  commit;
end;

But, how to set the exception queue when you’re programming in Java, in the context of an integration between Oracle AQ and Weblogic? The solution I’ve found is to set the string property called JMS_OracleExcpQ to the message with the name of the Oracle AQ exception queue, for example:


...
javax.jms.TextMessage msg = jmsSession.createTextMessage();
msg.setStringProperty("JMS_OracleExcpQ", "TEST.TEST_EXCEPTION_Q");
msg.setText("This is a Test");
jmsProducer.send(msg);
...

An important advice: put the name in uppercase and preceded by the name of the schema.

Finally, I’d like to point out that this is a good solution in the context of an integration between Oracle AQ and Weblogic, but makes the code non portable, if you want to change the messaging provider of your Weblogic Application Server.


References


Trying to set up a JMS bridge between HornetQ and Oracle AQ

I’ve been interested in setting up a JMS bridge between HornetQ and Oracle Advanced Queuing (AQ) for a while, even I wrote about this issue in a response to a question of the HornetQ user’s forum. I resume the matter in this post with some improvements, although, to be perfectly honest, I am not happy with the configuration.

In theory, the bridge is possible because both systems are JMS 1.1. compliant. If you use a HornetQ 2.2.5 standalone installation, you simply have to add some managed beans to the hornetq-beans.xml file, as is described in the user manual. The key point here is to use the proper connection factory and set up the destination following the rules of the Weblogic manual: “Interoperating with Oracle AQ JMS”. Here you have an excerpt of the hornetq-beans.xml file:

<bean name="JMSBridge">
  <constructor>
  ...
    <!-- concatenate JMS messageID to the target's message header -->
    <parameter>true</parameter>
  ...
</bean>
...
<!-- TargetCFF describes the ConnectionFactory used to connect to the target destination -->
<bean name="TargetCFF">
  <constructor>
    <parameter>
      <inject bean="TargetJNDI" />
    </parameter>
    <parameter>QueueConnectionFactory</parameter>
  </constructor>
</bean>
...
<!-- TargetDestinationFactory describes the Destination used as the target -->
<bean name="TargetDestinationFactory">
  <constructor>
    <parameter>
      <inject bean="TargetJNDI" />
    </parameter>
    <parameter>Queues/testQueue</parameter>
  </constructor>
</bean>
...
<!-- JNDI is a Hashtable containing the JNDI properties required -->
<!-- to connect to the *target* JMS resources                    -->
<bean name="TargetJNDI">
  <constructor>
    <map keyClass="java.lang.String" valueClass="java.lang.String">
      <entry>
        <key>java.naming.factory.initial</key>
        <value>oracle.jms.AQjmsInitialContextFactory</value>
      </entry>
      <entry>
        <key>db_url</key>
        <value>jdbc:oracle:thin:@oraclehost:1521:test</value>
      </entry>
      <entry>
        <key>java.naming.security.principal</key>
        <value>test</value>
      </entry>
      <entry>
        <key>java.naming.security.credentials</key>
        <value>secret</value>
      </entry>
    </map>
  </constructor>
</bean>

The former configuration runs properly with the DUPLICATES_OK quality of service, but when you moves to a ONCE_AND_ONLY_ONCE one, changing the TargetCFF to XAQueueConnectionFactory, you get a NullPointerException:

  oracle.jms.AQjmsException: Error creating the db_connection
    at oracle.jms.AQjmsDBConnMgr.getConnection
    at oracle.jms.AQjmsDBConnMgr.
    at oracle.jms.AQjmsXAConnection.
    at oracle.jms.AQjmsXAQueueConnectionFactory.createAllXAConnection
    at oracle.jms.AQjmsXAQueueConnectionFactory.createXAQueueConnection

After a research on Oracle Support notes, I found that the Oracle API has a bug (10102373) that it will be fixed on the future release 12.1 There are workarounds for not receiving duplicate messages, for example: to save the JMS Message ID in a Oracle table and implement a unique index on that column, you have to take into account that the JMS Messege ID sent by HornetQ is a property that can be recovered with the method get_string_property of the Oracle type aq$_jms_message.

But, why Weblogic can interoperate with Oracle AQ in XA configurations? I suspect that the reason is because uses datasources with pre-created database connections. So, I’ve tried to deploy the bridge within JBoss AS 6.0.0, with HornetQ as its default JMS provider. First of all, I created an Oracle XA datasource. After that, I set up the configuration file jms-bridge-jboss-beans.xml in a similar way as before, the only change is to change the db_url parameter by the parameter datasource:

<bean name="TargetJNDI">
  <constructor>
    <map keyClass="java.lang.String" valueClass="java.lang.String">
      <entry>
        <key>java.naming.factory.initial</key>
        <value>oracle.jms.AQjmsInitialContextFactory</value>
      </entry>
      <entry>
        <key>datasource</key>
        <value>jdbc/testOracleXA</value>
      </entry>
    </map>
  </constructor>
</bean>

But, when I tried to run the former configuration, the bridge didn’t start. Another bug, a HornetQ one in this case, prevented me from reaching my goal. To be perfectly honest, I didn’t patch the installation, but I restart the managed bean through the JMX console and I got another exception, a ClassCastException, because JBoss pass a wrapped Oracle connection to the class AQjmsInitialContextFactory, which is waiting for an native internal connection. I really felt powerless!

Finally, I adopted the solution of setting up an Oracle Weblogic server as a mediator,  the guru Edwin Biemond describes this configuration in this article.


Message Driven Beans & Oracle AQ

I’ve followed the chapter “Interoperating with Oracle AQ JMS” in order to integrate Oracle Streams Advanced Queuing (AQ) with Weblogic. All is working properly now, but I’ve had some problems programming a Message Driven Bean (MDB) which consumes messages enqueued by a PL/SQL procedure, so I would like to specify the minimum configuration properties you need to deploy this type of bean, apart from the queue name, of course.

The first one is the destination type for the MDB destination, which  should be configured to either: javax.jms.Queue or javax.jms.Topic, and the second one is the connection factory JNDI name that you’ve configured on Weblogic. Here you have a basic example:

@MessageDriven(mappedName = "aqjms/TestTopicQ",
               activationConfig = {
   @ActivationConfigProperty(propertyName = "destinationType", 
                             propertyValue = "javax.jms.Topic")})
@MessageDestinationConfiguration(connectionFactoryJNDIName = "aqjms/TestTopicCF")
public class TestMDB implements MessageListener {
    static final Logger logger = LoggingHelper.getServerLogger();

    public void onMessage(Message message) {
        TextMessage msg = null;

        try {
            if (message instanceof TextMessage) {
                msg = (TextMessage) message;
                logger.info("MESSAGE BEAN: Message received: " + msg.getText());
            } else {
                logger.warning("Message of wrong type: " + 
                                    message.getClass().getName());
            }
        } catch (JMSException e) {
            logger.severe("TestMDB.onMessage: JMSException: " + e.toString());
            e.printStackTrace();
        } catch (Throwable te) {
            logger.severe("TestMDB.onMessage: Exception: " + te.toString());
            te.printStackTrace();
        }
    }
}

 


References



Propagation of Oracle AQ messages between databases

I’ve been working on the Oracle Advanced Queueing feature last week, in order to use it in a interface between two applications that run on different databases, so I’d like to transmit my knowledge about this issue, showing a basic example about how to propagate messages between databases, because sometimes you have all that you need in a reference manual but it’s difficult to put all together.

First of all, I’d like to point out that I have used an Oracle 11g database, called LOCAL for this demo, and an Oracle 10g database, called REMOTE, so messages will be propagated from LOCAL to REMOTE. Each of them has a user called TEST, which has the role AQ_ADMINISTRATOR_ROLE granted. On the other hand, I setup a databases link between the two databases called TESTAQ.

The first step is to run this script in the LOCAL database (you can download all the scripts of this demo following this link):

-- Create the type for the payload. It must have the same structure that the one created
--    in the other database. Be careful with the character sets of both databases.
create or replace type test_type as object (message CLOB);
/
begin
  -- Create a table for queues of the type defined before.
  dbms_aqadm.create_queue_table (queue_table => 'aqdemo_queue1_t', 
                                 queue_payload_type => 'TEST.TEST_TYPE', 
                                 multiple_consumers => true);
  -- Create the test queue, which will receive the messages to be propagated.
  dbms_aqadm.create_queue (queue_name => 'aqdemo_queue1', 
                           queue_table => 'aqdemo_queue1_t');
  -- Start the queue for enqueuing and dequeuing messages.
  dbms_aqadm.start_queue (queue_name => 'aqdemo_queue1');
  -- Add the remote subscriber.
  dbms_aqadm.add_subscriber (queue_name => 'aqdemo_queue1', 
                             subscriber => sys.aq$_agent(name => 'aqdemo_queue1_subscriber', 
                                                         address => 'AQDEMO_QUEUE2@testaq', 
                                                         protocol => 0 ), 
                             queue_to_queue => true);
  -- Start the propagation of messages.
  dbms_aqadm.schedule_propagation(queue_name => 'TEST.AQDEMO_QUEUE1', 
                                  latency => 0, 
                                  destination => 'testaq', 
                                  destination_queue => 'TEST.AQDEMO_QUEUE2');
end;
/

The following step is to run this script in the REMOTE database:

create or replace type test_type as object (message CLOB);
/
begin
  -- Create a table for queues of the type defined before.
  dbms_aqadm.create_queue_table (queue_table => 'aqdemo_queue2_t', 
                                 queue_payload_type => 'TEST.TEST_TYPE', 
                                 multiple_consumers => true );
  -- Create the test queue, which will receive the messages from the queue on the LOCAL database.
  dbms_aqadm.create_queue (queue_name => 'aqdemo_queue2', 
                           queue_table => 'aqdemo_queue2_t');
  -- Start the queue for enqueuing and dequeuing messages.
  dbms_aqadm.start_queue (queue_name => 'aqdemo_queue2');
end;
/
-- Create a table to store the messages received.
create table aqdemo_queue2_message_t
  (received timestamp default systimestamp,
   message CLOB);
-- Create a callback procedure that dequeues the received message and saves it
create or replace
procedure aqdemo_queue2_callback_p
  (
    context raw,
    reginfo sys.aq$_reg_info,
    descr sys.aq$_descriptor,
    payload raw,
    payloadl number
  )
as
  r_dequeue_options dbms_aq.dequeue_options_t;
  r_message_properties dbms_aq.message_properties_t;
  v_message_handle raw(26);
  o_payload test_type;
begin
  r_dequeue_options.msgid         := descr.msg_id;
  r_dequeue_options.consumer_name := descr.consumer_name;
  dbms_aq.dequeue(queue_name => descr.queue_name, 
                  dequeue_options => r_dequeue_options, 
                  message_properties => r_message_properties, 
                  payload => o_payload, 
                  msgid => v_message_handle);
  insert into aqdemo_queue2_message_t 
    (message) 
    values (o_payload.message);
  commit;
exception
  when others then
    rollback;
end;
/
begin
  -- Register the procedure for dequeuing the messages received.
  -- I'd like to point out that the subscriber is the one defined for the local database
  dbms_aq.register (
     sys.aq$_reg_info_list(
        sys.aq$_reg_info('TEST.AQDEMO_QUEUE2:AQDEMO_QUEUE1_SUBSCRIBER', 
                         dbms_aq.namespace_aq, 'plsql://TEST.AQDEMO_QUEUE2_CALLBACK_P', 
                         hextoraw('FF'))
                        ), 
        1);
end;
/

The last step is to run this script which enqueue 100 test messages in the LOCAL database:

declare
  enq_msgid raw(16);
  eopt dbms_aq.enqueue_options_t;
  mprop dbms_aq.message_properties_t;
begin
  mprop.priority := 1;
  for i in 1..100
  loop
    dbms_aq.enqueue(queue_name => 'aqdemo_queue1', 
                    enqueue_options => eopt, 
                    message_properties => mprop, 
                    payload => test_type('This is a remote test message: ' || 
                                         lpad(i, 3, 0)), 
                    msgid => enq_msgid);
  end loop;
  commit;
end;

Finally, we check the messages received in the REMOTE database running this statement:

select *
from aqdemo_queue2_message_t
order by received

References