Oracle AQ: working with PL/SQL asynchronous notifications

I like Oracle Streams Advanced Queuing (AQ), it’s reliable and fast. I’ve been working with this technology for the last four years, with 10g and 11g database versions; most of the time, I’ve had to interact with Java EE systems, through Java Message Service (JMS), which it’s fully supported by Oracle AQ. JMS has Message Driven Beans (MDB) as the standard way to consume messages in Java EE, as its counter part on the Oracle database you can register asynchronous notifications to PL/SQL procedures. To be perfectly honest, I’ve always considered that the configuration of this functionality is a bit tricky, because sometimes you don’t get an error, it simply doesn’t work. That’s why I’m writing this post, to present a simple example of PLSQL asynchronous notifications that you can download from github.

The first point that I’d like to deal is the sign of the callback procedure, standalone or belonging to a package, that will consume the messages:

procedure receive_message_callback (
   context  raw,
   reginfo  sys.aq$_reg_info,
   descr    sys.aq$_descriptor,
   payload  raw,
   payloadl number
);

I think that the key here is the type of the argument payload: raw or varchar2, depending on the type of the message. I’ve prepared my sample with my personal Oracle Database Express Edition 11g Release 2, where I couldn’t work with Oracle AQ JMS types, so I’ve used a custom data type as the payload of the messages, which implies that the payload argument type has to be raw, but if you work, for example, with JMS Text Messages, SYS.AQ$_JMS_TEXT_MESSAGE in Oracle AQ, the payload argument type has to be varchar2.

The second issue is that the configuration needed varies depending if the destination is a queue or a topic.
In a queue each message is consumed by just one consumer, so you simply has to register the callback procedure. Here you have an excerpt of my code:

begin
   dbms_aqadm.create_queue_table (queue_table => 'queues_qt',
                                  queue_payload_type => 'TESTAQ.MESSAGES_T');

   dbms_aqadm.create_queue (queue_name => 'test_queue',
                            queue_table => 'queues_qt');

   dbms_aqadm.start_queue (queue_name => 'test_queue');

   dbms_aq.register(
      sys.aq$_reg_info_list(
         sys.aq$_reg_info(
            'TESTAQ.TEST_QUEUE',
            dbms_aq.namespace_aq,
            'plsql://TESTAQ.TEST_P.RECEIVE_MESSAGE_CALLBACK',
            hextoraw('FF')
         )
      ),
      1
   );
end;
/

The tip to remember here is not to forget the schema name prefix before queue and callback procedure names.

In a topic each message can be consumed by one or several subscriptors and each of them can process the message in a different way, for example sending an email, instead of processing by a PL/SQL procedure. So, you first have to register a subscriber, an agent in Oracle AQ terminology, and then register the PLSQL consumer. Here you have an excerpt of my code:

begin
   -- It's a topic, so multiple_consumers parameter is specified.
   dbms_aqadm.create_queue_table (queue_table => 'topics_qt',
                                  queue_payload_type => 'TESTAQ.MESSAGES_T',
                                  multiple_consumers => true);

   dbms_aqadm.create_queue (queue_name => 'test_topic',
                            queue_table => 'topics_qt');

   dbms_aqadm.start_queue (queue_name => 'test_topic');

   dbms_aqadm.add_subscriber (queue_name => 'test_topic',
                              subscriber => sys.aq$_agent(
                                               name => 'demo_subscriber',
                                               address => null,
                                               protocol => 0));

   dbms_aq.register(
      sys.aq$_reg_info_list(
         sys.aq$_reg_info(
            'TESTAQ.TEST_TOPIC:DEMO_SUBSCRIBER',
            dbms_aq.namespace_aq,
            'plsql://TESTAQ.TEST_P.RECEIVE_MESSAGE_CALLBACK',
            hextoraw('FF')
         )
     ),
     1
   );
end;
/

The tip to remember here is not to forget to put the name of the subscriber after the name of the topic, when you’re registering the callback procedure.


References


Advertisements

Changing the datatype of a column from VARCHAR2 to CLOB

Some days ago, I realized I hadn’t estimated correctly the size of a column of a table that receives data from an external source. I defined a VARCHAR2(4000 CHAR) datatype but it came a longer message  (I got an ORA-01461 error), so I decided to move the column to a CLOB datatype.

At first sight, it seems a simple matter. For example, if you have the following table definition:

   create table test_messages
   (message_id number             primary key,
    content   varchar2(4000 char) not null);

You would think to issue the following statement:

   alter table test_messages
   modify (content clob not null);

But, if you do that, you get an ORA-22296 error.

I had several options to solve the problem (please review the reference of this post), but I finally selected the following one:

   alter table test_messages
   add (temp_content clob);

   update test_messages
   set temp_content = content;

   alter table test_messages
   drop column content;

   alter table test_messages
   rename column temp_content to content;

   alter table test_messages
   modify (content not null);

Anyway, it’s very important to review all the application code affected by the change, because, for example, if you use the substr function, you have to change it to dbms_lob.substr. On the other hand, if you use JPA and you have an entity like:

   @Entity
   @Table(name = "TEST_MESSAGES")
   public class TestMessages implements Serializable {

       @Id
       @Column(name = "MESSAGE_ID")
       private Long messageId;

       @Column(name = "CONTENT", length = 4000, nullable = false)
       private String content;

       public Long getMessageId() {
           return messageId;
       }

       public void setMessageId(Long messageId) {
           this.messageId = messageId;
       }

       public String getContent() {
           return content;
       }

       public void setContent(String content) {
           this.content = content;
       }

   }

You have to change the definition of content to:

   @Lob
   @Column(name = "CONTENT", nullable = false)
   private String content;

Finally, I’d like to point out two issues that could affect to performance. The first one is the possibility of defer the removing of the initial column, running the following statement:

   alter table test_messages
   set unused column content;

instead of:

   alter table test_messages
   drop column content;

Later, in a period of low use of the database, you can issue:

   alter table test_messages
   drop unused columns;

The other thing regarding to performance is that it’s convenient reorganizing the storage of the table by running the following command:

   alter table test_messages move;

Statements like alter index <index_name> rebuild could also be needed.


References


Working with Oracle XMLType and JPA over Weblogic

I’ve been working with the object type of Oracle databases for XML for a long time, but always in the context of PL/SQL programming. Last week,  I developed a Java EE module that made use of JPA entities, supported by tables with SYS.XMLType columns, so I’d like to share my findings with an example.

First of all, here you have the creation sentence of a table that offers persistence to a simple messaging interface:


create table messages
  (
    message_id     number  constraint message_pk primary key,
    reception_date date    not null,
    msg_content    xmltype not null 
                           constraint message_msg_content_ck check(
                              xmlisvalid(msg_content) = 1)
  )
  xmltype column msg_content store as clob xmlschema
  "http://testdomain.com/Messages.xsd"
  element "Messages";

This is an excerpt of a JPA entity that could represent the former table:


package com.wordpress.fcosfc.test.messaging;

...

import org.eclipse.persistence.annotations.Customizer;

@Entity
@Table(name = "MESSAGES")
@Customizer(XMLTypeAttributeCustomizer.class)
public class Messages implements Serializable {

    @Id
    @Column(name = "MESSAGE_ID", nullable = false)
    private BigDecimal messageId;

    @Temporal(TemporalType.TIMESTAMP)
    @Column(name = "RECEPTION_DATE", nullable = false)
    private Date receptionDate;

    @Lob
    @Column(nullable = false, name ="msg_content")    
    private String msgContent;

    ...
}

I’m sure you’ve realized that there is an special annotation Customizer, which makes reference to this class:


package com.wordpress.fcosfc.test.messaging;

import org.eclipse.persistence.config.DescriptorCustomizer;
import org.eclipse.persistence.descriptors.ClassDescriptor;
import org.eclipse.persistence.mappings.xdb.DirectToXMLTypeMapping;

public class XMLTypeAttributeCustomizer implements DescriptorCustomizer {

    @Override
    public void customize(final ClassDescriptor descriptor) throws Exception {
        descriptor.removeMappingForAttributeName("msg_content");
        final DirectToXMLTypeMapping mapping = new DirectToXMLTypeMapping();
        mapping.setAttributeName("msg_content");
        mapping.setFieldName("MSG_CONTENT");
        mapping.getField().setColumnDefinition("sys.XMLTYPE");
        descriptor.addMapping(mapping);
    }

}

This class has a method that changes the default mapping of the EclipseLink JPA provider in order to use the proper type from Oracle: SYS.XMLType.

Finally, I’d like to point out that the first time I deployed my module over an Oracle Weblogic 10.3.5 application server I got a ClassNotFoundException: oracle/xdb/xmltype , so I had to extend my Weblogic domain in order to support Oracle XMType. Here you have my reference about how to do it, I just followed the instructions to support Oracle XDB.


References


Weblogic: Interoperating with Oracle AQ JMS. A tip for sending messages

I usually work with Oracle Streams Advanced Queuing (AQ) integrated with Weblogic. In PL/SQL programs you can set the exception queue where you want to move the messages that couldn’t be delivered to their destinations, when you send a message, for example:


declare
  enq_msgid raw(16) ;
  eopt      dbms_aq.enqueue_options_t;
  mprop     dbms_aq.message_properties_t;
  message   sys.aq$_jms_text_message;
begin
  mprop.priority        := 1;
  mprop.exception_queue := 'test_exception_q';
  message               := sys.aq$_jms_text_message.construct() ;
  message.set_text('This is a Test') ;
  dbms_aq.enqueue(queue_name => 'test_q', 
                  enqueue_options => eopt,
                  message_properties => mprop, 
                  payload => message, 
                  msgid => enq_msgid) ;

  commit;
end;

But, how to set the exception queue when you’re programming in Java, in the context of an integration between Oracle AQ and Weblogic? The solution I’ve found is to set the string property called JMS_OracleExcpQ to the message with the name of the Oracle AQ exception queue, for example:


...
javax.jms.TextMessage msg = jmsSession.createTextMessage();
msg.setStringProperty("JMS_OracleExcpQ", "TEST.TEST_EXCEPTION_Q");
msg.setText("This is a Test");
jmsProducer.send(msg);
...

An important advice: put the name in uppercase and preceded by the name of the schema.

Finally, I’d like to point out that this is a good solution in the context of an integration between Oracle AQ and Weblogic, but makes the code non portable, if you want to change the messaging provider of your Weblogic Application Server.


References


Fighting with sockets!

Last week, I was appointed to develop a Java program that should connect to an external secure socket, in order to get data provided by a partner company. Another requisite was that the module should be stored on an Oracle 11g Database, so I must use a 1.5 JDK. Easy, I thought!

First of all, I review Java Secure Socket Extension (JSSE) Reference Guide. Our company partner IT team provided me with the key store containing the certificate I should trust and I decide to program a custom SSL context:

   ...
   KeyStore keyStoreTrust = KeyStore.getInstance("PKCS12");
   keyStoreTrust.load(this.getClass().getResourceAsStream("KeyStoreTrust.pfx"),
                      "password".toCharArray());
   TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance("PKIX");
   trustManagerFactory.init(keyStoreTrust);

   SSLContext sslContext = SSLContext.getInstance("SSL");
   sslContext.init(null, trustManagerFactory.getTrustManagers(), null);
   ...

The first problem arose when the server socket (developed in Microsoft .NET C#) unexpectedly closed the connection during the handshake, the support guy of my partner company said me that they got the following error message: “The client and server cannot communicate, because they do not possess a common algorithm”. Therefore, I delved into the problem and finally I realized that the server wanted to use a TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA cipher suite, which wasn’t supported by the security providers shipped with the JDK 1.5 I’d like to point out that a key point to find out the source of the error was to activate the debug of the SSL connection:

System.setProperty("javax.net.debug", "ssl");

Hence, I decided to add to my program the well-known Bouncy Castle security provider, which supports the required cipher suite and it’s 1.5 compliant:

Security.addProvider(new BouncyCastleProvider());

Once I sorted out the problem, everything started to work properly, at least as an stand-alone client! So, I created a “Loadjava and Java Stored Procedures” profile in my JDeveloper IDE, in order to deploy the software to the Oracle Database 11.2, but when I tried to do it I got the following errors:

Invoking loadjava on connection 'Test11g_Paco' with arguments:
 -order -resolve -definer -thin -resolver ((* TEST) (* PUBLIC) (* -)) -synonym
 errors   : class org/bouncycastle/jcajce/provider/asymmetric/ec/BCECPrivateKey
 ORA-29552: verification warning: java.lang.NoClassDefFoundError: java/security/interfaces/ECPrivateKey

 errors   : class org/bouncycastle/jcajce/provider/asymmetric/ec/BCECPublicKey
 ORA-29552: verification warning: java.lang.NoClassDefFoundError: java/security/interfaces/ECPublicKey

 errors   : class org/bouncycastle/jcajce/provider/asymmetric/ecgost/BCECGOST3410PrivateKey
 ORA-29552: verification warning: java.lang.NoClassDefFoundError: java/security/interfaces/ECPrivateKey

 errors   : class org/bouncycastle/jcajce/provider/asymmetric/ecgost/BCECGOST3410PublicKey
 ORA-29552: verification warning: java.lang.NoClassDefFoundError: java/security/interfaces/ECPublicKey

 errors   : class org/bouncycastle/jce/provider/JCEECPrivateKey
 ORA-29552: verification warning: java.lang.NoClassDefFoundError: java/security/interfaces/ECPrivateKey

 errors   : class org/bouncycastle/jce/provider/JCEECPublicKey
 ORA-29552: verification warning: java.lang.NoClassDefFoundError: java/security/interfaces/ECPublicKey

 Loadjava finished.

I can’t understand the problem because the interfaces java.security.interfaces.ECPublicKey and java.security.interfaces.ECPrivateKey are available in 1.5 and the Oracle Database 11.2 JVM is supposed to be 1.5 compliant, but I couldn’t find any satisfactory solution.


Propagation of Oracle AQ messages between databases

I’ve been working on the Oracle Advanced Queueing feature last week, in order to use it in a interface between two applications that run on different databases, so I’d like to transmit my knowledge about this issue, showing a basic example about how to propagate messages between databases, because sometimes you have all that you need in a reference manual but it’s difficult to put all together.

First of all, I’d like to point out that I have used an Oracle 11g database, called LOCAL for this demo, and an Oracle 10g database, called REMOTE, so messages will be propagated from LOCAL to REMOTE. Each of them has a user called TEST, which has the role AQ_ADMINISTRATOR_ROLE granted. On the other hand, I setup a databases link between the two databases called TESTAQ.

The first step is to run this script in the LOCAL database (you can download all the scripts of this demo following this link):

-- Create the type for the payload. It must have the same structure that the one created
--    in the other database. Be careful with the character sets of both databases.
create or replace type test_type as object (message CLOB);
/
begin
  -- Create a table for queues of the type defined before.
  dbms_aqadm.create_queue_table (queue_table => 'aqdemo_queue1_t', 
                                 queue_payload_type => 'TEST.TEST_TYPE', 
                                 multiple_consumers => true);
  -- Create the test queue, which will receive the messages to be propagated.
  dbms_aqadm.create_queue (queue_name => 'aqdemo_queue1', 
                           queue_table => 'aqdemo_queue1_t');
  -- Start the queue for enqueuing and dequeuing messages.
  dbms_aqadm.start_queue (queue_name => 'aqdemo_queue1');
  -- Add the remote subscriber.
  dbms_aqadm.add_subscriber (queue_name => 'aqdemo_queue1', 
                             subscriber => sys.aq$_agent(name => 'aqdemo_queue1_subscriber', 
                                                         address => 'AQDEMO_QUEUE2@testaq', 
                                                         protocol => 0 ), 
                             queue_to_queue => true);
  -- Start the propagation of messages.
  dbms_aqadm.schedule_propagation(queue_name => 'TEST.AQDEMO_QUEUE1', 
                                  latency => 0, 
                                  destination => 'testaq', 
                                  destination_queue => 'TEST.AQDEMO_QUEUE2');
end;
/

The following step is to run this script in the REMOTE database:

create or replace type test_type as object (message CLOB);
/
begin
  -- Create a table for queues of the type defined before.
  dbms_aqadm.create_queue_table (queue_table => 'aqdemo_queue2_t', 
                                 queue_payload_type => 'TEST.TEST_TYPE', 
                                 multiple_consumers => true );
  -- Create the test queue, which will receive the messages from the queue on the LOCAL database.
  dbms_aqadm.create_queue (queue_name => 'aqdemo_queue2', 
                           queue_table => 'aqdemo_queue2_t');
  -- Start the queue for enqueuing and dequeuing messages.
  dbms_aqadm.start_queue (queue_name => 'aqdemo_queue2');
end;
/
-- Create a table to store the messages received.
create table aqdemo_queue2_message_t
  (received timestamp default systimestamp,
   message CLOB);
-- Create a callback procedure that dequeues the received message and saves it
create or replace
procedure aqdemo_queue2_callback_p
  (
    context raw,
    reginfo sys.aq$_reg_info,
    descr sys.aq$_descriptor,
    payload raw,
    payloadl number
  )
as
  r_dequeue_options dbms_aq.dequeue_options_t;
  r_message_properties dbms_aq.message_properties_t;
  v_message_handle raw(26);
  o_payload test_type;
begin
  r_dequeue_options.msgid         := descr.msg_id;
  r_dequeue_options.consumer_name := descr.consumer_name;
  dbms_aq.dequeue(queue_name => descr.queue_name, 
                  dequeue_options => r_dequeue_options, 
                  message_properties => r_message_properties, 
                  payload => o_payload, 
                  msgid => v_message_handle);
  insert into aqdemo_queue2_message_t 
    (message) 
    values (o_payload.message);
  commit;
exception
  when others then
    rollback;
end;
/
begin
  -- Register the procedure for dequeuing the messages received.
  -- I'd like to point out that the subscriber is the one defined for the local database
  dbms_aq.register (
     sys.aq$_reg_info_list(
        sys.aq$_reg_info('TEST.AQDEMO_QUEUE2:AQDEMO_QUEUE1_SUBSCRIBER', 
                         dbms_aq.namespace_aq, 'plsql://TEST.AQDEMO_QUEUE2_CALLBACK_P', 
                         hextoraw('FF'))
                        ), 
        1);
end;
/

The last step is to run this script which enqueue 100 test messages in the LOCAL database:

declare
  enq_msgid raw(16);
  eopt dbms_aq.enqueue_options_t;
  mprop dbms_aq.message_properties_t;
begin
  mprop.priority := 1;
  for i in 1..100
  loop
    dbms_aq.enqueue(queue_name => 'aqdemo_queue1', 
                    enqueue_options => eopt, 
                    message_properties => mprop, 
                    payload => test_type('This is a remote test message: ' || 
                                         lpad(i, 3, 0)), 
                    msgid => enq_msgid);
  end loop;
  commit;
end;

Finally, we check the messages received in the REMOTE database running this statement:

select *
from aqdemo_queue2_message_t
order by received

References



Debugging Java Stored Procedures

I’ve been a fan of developing Java software stored in Oracle Databases for the last years, it’s the perfect partner of PL/SQL,  where this structured language is difficult to deal with or simply can’t achieve the goal.

One of my first problems was remote debugging, a common issue for every programmer.

First of all, I asked my DBA for the DEBUG CONNECT SESSION system privilege. When I got it, I had to configure my JDeveloper project for remote debugging, I started using the version 10g of this powerful IDE, so I changed the project properties in order to listen for JPDA (Java Platform Debugger Architecture) connections:

Project properties for remote debugging, JDeveloper 10g

Nowadays, I’m working with JDeveloper 11g, where I have to configure a Run/Debug/Profile within the project properties, clicking on the Remote Debugging check-box of the Launch Settings and adjusting the same parameters of the former version:

Project properties for remote debugging, JDeveloper 11g

Later on, the next step was to set the breakpoints I needed in the code, so I could go to the menu Run → Debug, in order to listen for remote debugging sessions. At this point, I was asked for the details of the connection:

Listening process parameters

Once I started the listening process, I could check for it checking the Run Manager.

The next step was to run SQL*Plus (nowadays I use SQLDeveloper), logging on the Database and calling the procedure dbms_debug_jdwp.connect_tcp with two VARCHAR2 parameters: the IP direction of my PC and  the port where my IDE was listening for JPDA connections.

After that, I started my debugging session calling the PL/SQL wrapper from SQL*Plus.

When I finished my debugging session, I ran the procedure dbms_debug_jdwp.disconnect from SQL*Plus and I used the Run Manager of JDeveloper to terminate the listening process.

Finally, I would like to talk of some problems I’ve suffered from: sometimes the debugger disconnects the session without any reason, others it disconnects when the Java code throws an exception in order to be managed by the calling PL/SQL, both times the Oracle Database connection is finished too.