Oracle AQ: working with PL/SQL asynchronous notificationsPosted: 12/17/2014
I like Oracle Streams Advanced Queuing (AQ), it’s reliable and fast. I’ve been working with this technology for the last four years, with 10g and 11g database versions; most of the time, I’ve had to interact with Java EE systems, through Java Message Service (JMS), which it’s fully supported by Oracle AQ. JMS has Message Driven Beans (MDB) as the standard way to consume messages in Java EE, as its counter part on the Oracle database you can register asynchronous notifications to PL/SQL procedures. To be perfectly honest, I’ve always considered that the configuration of this functionality is a bit tricky, because sometimes you don’t get an error, it simply doesn’t work. That’s why I’m writing this post, to present a simple example of PLSQL asynchronous notifications that you can download from github.
The first point that I’d like to deal is the sign of the callback procedure, standalone or belonging to a package, that will consume the messages:
procedure receive_message_callback ( context raw, reginfo sys.aq$_reg_info, descr sys.aq$_descriptor, payload raw, payloadl number );
I think that the key here is the type of the argument payload: raw or varchar2, depending on the type of the message. I’ve prepared my sample with my personal Oracle Database Express Edition 11g Release 2, where I couldn’t work with Oracle AQ JMS types, so I’ve used a custom data type as the payload of the messages, which implies that the payload argument type has to be raw, but if you work, for example, with JMS Text Messages, SYS.AQ$_JMS_TEXT_MESSAGE in Oracle AQ, the payload argument type has to be varchar2.
The second issue is that the configuration needed varies depending if the destination is a queue or a topic.
In a queue each message is consumed by just one consumer, so you simply has to register the callback procedure. Here you have an excerpt of my code:
begin dbms_aqadm.create_queue_table (queue_table => 'queues_qt', queue_payload_type => 'TESTAQ.MESSAGES_T'); dbms_aqadm.create_queue (queue_name => 'test_queue', queue_table => 'queues_qt'); dbms_aqadm.start_queue (queue_name => 'test_queue'); dbms_aq.register( sys.aq$_reg_info_list( sys.aq$_reg_info( 'TESTAQ.TEST_QUEUE', dbms_aq.namespace_aq, 'plsql://TESTAQ.TEST_P.RECEIVE_MESSAGE_CALLBACK', hextoraw('FF') ) ), 1 ); end; /
The tip to remember here is not to forget the schema name prefix before queue and callback procedure names.
In a topic each message can be consumed by one or several subscriptors and each of them can process the message in a different way, for example sending an email, instead of processing by a PL/SQL procedure. So, you first have to register a subscriber, an agent in Oracle AQ terminology, and then register the PLSQL consumer. Here you have an excerpt of my code:
begin -- It's a topic, so multiple_consumers parameter is specified. dbms_aqadm.create_queue_table (queue_table => 'topics_qt', queue_payload_type => 'TESTAQ.MESSAGES_T', multiple_consumers => true); dbms_aqadm.create_queue (queue_name => 'test_topic', queue_table => 'topics_qt'); dbms_aqadm.start_queue (queue_name => 'test_topic'); dbms_aqadm.add_subscriber (queue_name => 'test_topic', subscriber => sys.aq$_agent( name => 'demo_subscriber', address => null, protocol => 0)); dbms_aq.register( sys.aq$_reg_info_list( sys.aq$_reg_info( 'TESTAQ.TEST_TOPIC:DEMO_SUBSCRIBER', dbms_aq.namespace_aq, 'plsql://TESTAQ.TEST_P.RECEIVE_MESSAGE_CALLBACK', hextoraw('FF') ) ), 1 ); end; /
The tip to remember here is not to forget to put the name of the subscriber after the name of the topic, when you’re registering the callback procedure.
- Oracle Streams Advanced Queuing User’s Guide 11g Release 2
- Oracle Database PL/SQL Packages and Types Reference 11g Release 2
- Demo Oracle Advanced Queuing (AQ), PL/SQL asynchronous notifications