Oracle AQ: working with PL/SQL asynchronous notifications

I like Oracle Streams Advanced Queuing (AQ), it’s reliable and fast. I’ve been working with this technology for the last four years, with 10g and 11g database versions; most of the time, I’ve had to interact with Java EE systems, through Java Message Service (JMS), which it’s fully supported by Oracle AQ. JMS has Message Driven Beans (MDB) as the standard way to consume messages in Java EE, as its counter part on the Oracle database you can register asynchronous notifications to PL/SQL procedures. To be perfectly honest, I’ve always considered that the configuration of this functionality is a bit tricky, because sometimes you don’t get an error, it simply doesn’t work. That’s why I’m writing this post, to present a simple example of PLSQL asynchronous notifications that you can download from github.

The first point that I’d like to deal is the sign of the callback procedure, standalone or belonging to a package, that will consume the messages:

procedure receive_message_callback (
   context  raw,
   reginfo  sys.aq$_reg_info,
   descr    sys.aq$_descriptor,
   payload  raw,
   payloadl number
);

I think that the key here is the type of the argument payload: raw or varchar2, depending on the type of the message. I’ve prepared my sample with my personal Oracle Database Express Edition 11g Release 2, where I couldn’t work with Oracle AQ JMS types, so I’ve used a custom data type as the payload of the messages, which implies that the payload argument type has to be raw, but if you work, for example, with JMS Text Messages, SYS.AQ$_JMS_TEXT_MESSAGE in Oracle AQ, the payload argument type has to be varchar2.

The second issue is that the configuration needed varies depending if the destination is a queue or a topic.
In a queue each message is consumed by just one consumer, so you simply has to register the callback procedure. Here you have an excerpt of my code:

begin
   dbms_aqadm.create_queue_table (queue_table => 'queues_qt',
                                  queue_payload_type => 'TESTAQ.MESSAGES_T');

   dbms_aqadm.create_queue (queue_name => 'test_queue',
                            queue_table => 'queues_qt');

   dbms_aqadm.start_queue (queue_name => 'test_queue');

   dbms_aq.register(
      sys.aq$_reg_info_list(
         sys.aq$_reg_info(
            'TESTAQ.TEST_QUEUE',
            dbms_aq.namespace_aq,
            'plsql://TESTAQ.TEST_P.RECEIVE_MESSAGE_CALLBACK',
            hextoraw('FF')
         )
      ),
      1
   );
end;
/

The tip to remember here is not to forget the schema name prefix before queue and callback procedure names.

In a topic each message can be consumed by one or several subscriptors and each of them can process the message in a different way, for example sending an email, instead of processing by a PL/SQL procedure. So, you first have to register a subscriber, an agent in Oracle AQ terminology, and then register the PLSQL consumer. Here you have an excerpt of my code:

begin
   -- It's a topic, so multiple_consumers parameter is specified.
   dbms_aqadm.create_queue_table (queue_table => 'topics_qt',
                                  queue_payload_type => 'TESTAQ.MESSAGES_T',
                                  multiple_consumers => true);

   dbms_aqadm.create_queue (queue_name => 'test_topic',
                            queue_table => 'topics_qt');

   dbms_aqadm.start_queue (queue_name => 'test_topic');

   dbms_aqadm.add_subscriber (queue_name => 'test_topic',
                              subscriber => sys.aq$_agent(
                                               name => 'demo_subscriber',
                                               address => null,
                                               protocol => 0));

   dbms_aq.register(
      sys.aq$_reg_info_list(
         sys.aq$_reg_info(
            'TESTAQ.TEST_TOPIC:DEMO_SUBSCRIBER',
            dbms_aq.namespace_aq,
            'plsql://TESTAQ.TEST_P.RECEIVE_MESSAGE_CALLBACK',
            hextoraw('FF')
         )
     ),
     1
   );
end;
/

The tip to remember here is not to forget to put the name of the subscriber after the name of the topic, when you’re registering the callback procedure.


References



Changing the datatype of a column from VARCHAR2 to CLOB

Some days ago, I realized I hadn’t estimated correctly the size of a column of a table that receives data from an external source. I defined a VARCHAR2(4000 CHAR) datatype but it came a longer message  (I got an ORA-01461 error), so I decided to move the column to a CLOB datatype.

At first sight, it seems a simple matter. For example, if you have the following table definition:

   create table test_messages
   (message_id number             primary key,
    content   varchar2(4000 char) not null);

You would think to issue the following statement:

   alter table test_messages
   modify (content clob not null);

But, if you do that, you get an ORA-22296 error.

I had several options to solve the problem (please review the reference of this post), but I finally selected the following one:

   alter table test_messages
   add (temp_content clob);

   update test_messages
   set temp_content = content;

   alter table test_messages
   drop column content;

   alter table test_messages
   rename column temp_content to content;

   alter table test_messages
   modify (content not null);

Anyway, it’s very important to review all the application code affected by the change, because, for example, if you use the substr function, you have to change it to dbms_lob.substr. On the other hand, if you use JPA and you have an entity like:

   @Entity
   @Table(name = "TEST_MESSAGES")
   public class TestMessages implements Serializable {

       @Id
       @Column(name = "MESSAGE_ID")
       private Long messageId;

       @Column(name = "CONTENT", length = 4000, nullable = false)
       private String content;

       public Long getMessageId() {
           return messageId;
       }

       public void setMessageId(Long messageId) {
           this.messageId = messageId;
       }

       public String getContent() {
           return content;
       }

       public void setContent(String content) {
           this.content = content;
       }

   }

You have to change the definition of content to:

   @Lob
   @Column(name = "CONTENT", nullable = false)
   private String content;

Finally, I’d like to point out two issues that could affect to performance. The first one is the possibility of defer the removing of the initial column, running the following statement:

   alter table test_messages
   set unused column content;

instead of:

   alter table test_messages
   drop column content;

Later, in a period of low use of the database, you can issue:

   alter table test_messages
   drop unused columns;

The other thing regarding to performance is that it’s convenient reorganizing the storage of the table by running the following command:

   alter table test_messages move;

Statements like alter index <index_name> rebuild could also be needed.


References


Weblogic: Interoperating with Oracle AQ JMS. A tip for sending messages

I usually work with Oracle Streams Advanced Queuing (AQ) integrated with Weblogic. In PL/SQL programs you can set the exception queue where you want to move the messages that couldn’t be delivered to their destinations, when you send a message, for example:


declare
  enq_msgid raw(16) ;
  eopt      dbms_aq.enqueue_options_t;
  mprop     dbms_aq.message_properties_t;
  message   sys.aq$_jms_text_message;
begin
  mprop.priority        := 1;
  mprop.exception_queue := 'test_exception_q';
  message               := sys.aq$_jms_text_message.construct() ;
  message.set_text('This is a Test') ;
  dbms_aq.enqueue(queue_name => 'test_q', 
                  enqueue_options => eopt,
                  message_properties => mprop, 
                  payload => message, 
                  msgid => enq_msgid) ;

  commit;
end;

But, how to set the exception queue when you’re programming in Java, in the context of an integration between Oracle AQ and Weblogic? The solution I’ve found is to set the string property called JMS_OracleExcpQ to the message with the name of the Oracle AQ exception queue, for example:


...
javax.jms.TextMessage msg = jmsSession.createTextMessage();
msg.setStringProperty("JMS_OracleExcpQ", "TEST.TEST_EXCEPTION_Q");
msg.setText("This is a Test");
jmsProducer.send(msg);
...

An important advice: put the name in uppercase and preceded by the name of the schema.

Finally, I’d like to point out that this is a good solution in the context of an integration between Oracle AQ and Weblogic, but makes the code non portable, if you want to change the messaging provider of your Weblogic Application Server.


References


SQL statement to get the weeks before the current date

I recently had to design a report to analyze the data of a week, so the user had to select that week using an input control. The data source of the combo box was the following SQL statement that runs in an Oracle Database:

select to_char(next_day(trunc(sysdate, 'DAY'), 'SATURDAY') - (level * 7) - 6, 
                    'mm/dd/yyyy')
  || ' - '
  || to_char(next_day(trunc(sysdate, 'DAY'), 'SATURDAY') - (level * 7), 
             'mm/dd/yyyy') week
from dual
  connect by level <= 52
order by level

This code employs the hierarchical query clause CONNECT BY to select the initial and the final day of the weeks before the current date, a period of a year more or less.


Message Driven Beans & Oracle AQ

I’ve followed the chapter “Interoperating with Oracle AQ JMS” in order to integrate Oracle Streams Advanced Queuing (AQ) with Weblogic. All is working properly now, but I’ve had some problems programming a Message Driven Bean (MDB) which consumes messages enqueued by a PL/SQL procedure, so I would like to specify the minimum configuration properties you need to deploy this type of bean, apart from the queue name, of course.

The first one is the destination type for the MDB destination, which  should be configured to either: javax.jms.Queue or javax.jms.Topic, and the second one is the connection factory JNDI name that you’ve configured on Weblogic. Here you have a basic example:

@MessageDriven(mappedName = "aqjms/TestTopicQ",
               activationConfig = {
   @ActivationConfigProperty(propertyName = "destinationType", 
                             propertyValue = "javax.jms.Topic")})
@MessageDestinationConfiguration(connectionFactoryJNDIName = "aqjms/TestTopicCF")
public class TestMDB implements MessageListener {
    static final Logger logger = LoggingHelper.getServerLogger();

    public void onMessage(Message message) {
        TextMessage msg = null;

        try {
            if (message instanceof TextMessage) {
                msg = (TextMessage) message;
                logger.info("MESSAGE BEAN: Message received: " + msg.getText());
            } else {
                logger.warning("Message of wrong type: " + 
                                    message.getClass().getName());
            }
        } catch (JMSException e) {
            logger.severe("TestMDB.onMessage: JMSException: " + e.toString());
            e.printStackTrace();
        } catch (Throwable te) {
            logger.severe("TestMDB.onMessage: Exception: " + te.toString());
            te.printStackTrace();
        }
    }
}

 


References



Propagation of Oracle AQ messages between databases

I’ve been working on the Oracle Advanced Queueing feature last week, in order to use it in a interface between two applications that run on different databases, so I’d like to transmit my knowledge about this issue, showing a basic example about how to propagate messages between databases, because sometimes you have all that you need in a reference manual but it’s difficult to put all together.

First of all, I’d like to point out that I have used an Oracle 11g database, called LOCAL for this demo, and an Oracle 10g database, called REMOTE, so messages will be propagated from LOCAL to REMOTE. Each of them has a user called TEST, which has the role AQ_ADMINISTRATOR_ROLE granted. On the other hand, I setup a databases link between the two databases called TESTAQ.

The first step is to run this script in the LOCAL database (you can download all the scripts of this demo following this link):

-- Create the type for the payload. It must have the same structure that the one created
--    in the other database. Be careful with the character sets of both databases.
create or replace type test_type as object (message CLOB);
/
begin
  -- Create a table for queues of the type defined before.
  dbms_aqadm.create_queue_table (queue_table => 'aqdemo_queue1_t', 
                                 queue_payload_type => 'TEST.TEST_TYPE', 
                                 multiple_consumers => true);
  -- Create the test queue, which will receive the messages to be propagated.
  dbms_aqadm.create_queue (queue_name => 'aqdemo_queue1', 
                           queue_table => 'aqdemo_queue1_t');
  -- Start the queue for enqueuing and dequeuing messages.
  dbms_aqadm.start_queue (queue_name => 'aqdemo_queue1');
  -- Add the remote subscriber.
  dbms_aqadm.add_subscriber (queue_name => 'aqdemo_queue1', 
                             subscriber => sys.aq$_agent(name => 'aqdemo_queue1_subscriber', 
                                                         address => 'AQDEMO_QUEUE2@testaq', 
                                                         protocol => 0 ), 
                             queue_to_queue => true);
  -- Start the propagation of messages.
  dbms_aqadm.schedule_propagation(queue_name => 'TEST.AQDEMO_QUEUE1', 
                                  latency => 0, 
                                  destination => 'testaq', 
                                  destination_queue => 'TEST.AQDEMO_QUEUE2');
end;
/

The following step is to run this script in the REMOTE database:

create or replace type test_type as object (message CLOB);
/
begin
  -- Create a table for queues of the type defined before.
  dbms_aqadm.create_queue_table (queue_table => 'aqdemo_queue2_t', 
                                 queue_payload_type => 'TEST.TEST_TYPE', 
                                 multiple_consumers => true );
  -- Create the test queue, which will receive the messages from the queue on the LOCAL database.
  dbms_aqadm.create_queue (queue_name => 'aqdemo_queue2', 
                           queue_table => 'aqdemo_queue2_t');
  -- Start the queue for enqueuing and dequeuing messages.
  dbms_aqadm.start_queue (queue_name => 'aqdemo_queue2');
end;
/
-- Create a table to store the messages received.
create table aqdemo_queue2_message_t
  (received timestamp default systimestamp,
   message CLOB);
-- Create a callback procedure that dequeues the received message and saves it
create or replace
procedure aqdemo_queue2_callback_p
  (
    context raw,
    reginfo sys.aq$_reg_info,
    descr sys.aq$_descriptor,
    payload raw,
    payloadl number
  )
as
  r_dequeue_options dbms_aq.dequeue_options_t;
  r_message_properties dbms_aq.message_properties_t;
  v_message_handle raw(26);
  o_payload test_type;
begin
  r_dequeue_options.msgid         := descr.msg_id;
  r_dequeue_options.consumer_name := descr.consumer_name;
  dbms_aq.dequeue(queue_name => descr.queue_name, 
                  dequeue_options => r_dequeue_options, 
                  message_properties => r_message_properties, 
                  payload => o_payload, 
                  msgid => v_message_handle);
  insert into aqdemo_queue2_message_t 
    (message) 
    values (o_payload.message);
  commit;
exception
  when others then
    rollback;
end;
/
begin
  -- Register the procedure for dequeuing the messages received.
  -- I'd like to point out that the subscriber is the one defined for the local database
  dbms_aq.register (
     sys.aq$_reg_info_list(
        sys.aq$_reg_info('TEST.AQDEMO_QUEUE2:AQDEMO_QUEUE1_SUBSCRIBER', 
                         dbms_aq.namespace_aq, 'plsql://TEST.AQDEMO_QUEUE2_CALLBACK_P', 
                         hextoraw('FF'))
                        ), 
        1);
end;
/

The last step is to run this script which enqueue 100 test messages in the LOCAL database:

declare
  enq_msgid raw(16);
  eopt dbms_aq.enqueue_options_t;
  mprop dbms_aq.message_properties_t;
begin
  mprop.priority := 1;
  for i in 1..100
  loop
    dbms_aq.enqueue(queue_name => 'aqdemo_queue1', 
                    enqueue_options => eopt, 
                    message_properties => mprop, 
                    payload => test_type('This is a remote test message: ' || 
                                         lpad(i, 3, 0)), 
                    msgid => enq_msgid);
  end loop;
  commit;
end;

Finally, we check the messages received in the REMOTE database running this statement:

select *
from aqdemo_queue2_message_t
order by received

References