Propagation of Oracle AQ messages between databasesPosted: 07/14/2011
I’ve been working on the Oracle Advanced Queueing feature last week, in order to use it in a interface between two applications that run on different databases, so I’d like to transmit my knowledge about this issue, showing a basic example about how to propagate messages between databases, because sometimes you have all that you need in a reference manual but it’s difficult to put all together.
First of all, I’d like to point out that I have used an Oracle 11g database, called LOCAL for this demo, and an Oracle 10g database, called REMOTE, so messages will be propagated from LOCAL to REMOTE. Each of them has a user called TEST, which has the role AQ_ADMINISTRATOR_ROLE granted. On the other hand, I setup a databases link between the two databases called TESTAQ.
The first step is to run this script in the LOCAL database (you can download all the scripts of this demo following this link):
-- Create the type for the payload. It must have the same structure that the one created -- in the other database. Be careful with the character sets of both databases. create or replace type test_type as object (message CLOB); / begin -- Create a table for queues of the type defined before. dbms_aqadm.create_queue_table (queue_table => 'aqdemo_queue1_t', queue_payload_type => 'TEST.TEST_TYPE', multiple_consumers => true); -- Create the test queue, which will receive the messages to be propagated. dbms_aqadm.create_queue (queue_name => 'aqdemo_queue1', queue_table => 'aqdemo_queue1_t'); -- Start the queue for enqueuing and dequeuing messages. dbms_aqadm.start_queue (queue_name => 'aqdemo_queue1'); -- Add the remote subscriber. dbms_aqadm.add_subscriber (queue_name => 'aqdemo_queue1', subscriber => sys.aq$_agent(name => 'aqdemo_queue1_subscriber', address => 'AQDEMO_QUEUE2@testaq', protocol => 0 ), queue_to_queue => true); -- Start the propagation of messages. dbms_aqadm.schedule_propagation(queue_name => 'TEST.AQDEMO_QUEUE1', latency => 0, destination => 'testaq', destination_queue => 'TEST.AQDEMO_QUEUE2'); end; /
The following step is to run this script in the REMOTE database:
create or replace type test_type as object (message CLOB); / begin -- Create a table for queues of the type defined before. dbms_aqadm.create_queue_table (queue_table => 'aqdemo_queue2_t', queue_payload_type => 'TEST.TEST_TYPE', multiple_consumers => true ); -- Create the test queue, which will receive the messages from the queue on the LOCAL database. dbms_aqadm.create_queue (queue_name => 'aqdemo_queue2', queue_table => 'aqdemo_queue2_t'); -- Start the queue for enqueuing and dequeuing messages. dbms_aqadm.start_queue (queue_name => 'aqdemo_queue2'); end; / -- Create a table to store the messages received. create table aqdemo_queue2_message_t (received timestamp default systimestamp, message CLOB); -- Create a callback procedure that dequeues the received message and saves it create or replace procedure aqdemo_queue2_callback_p ( context raw, reginfo sys.aq$_reg_info, descr sys.aq$_descriptor, payload raw, payloadl number ) as r_dequeue_options dbms_aq.dequeue_options_t; r_message_properties dbms_aq.message_properties_t; v_message_handle raw(26); o_payload test_type; begin r_dequeue_options.msgid := descr.msg_id; r_dequeue_options.consumer_name := descr.consumer_name; dbms_aq.dequeue(queue_name => descr.queue_name, dequeue_options => r_dequeue_options, message_properties => r_message_properties, payload => o_payload, msgid => v_message_handle); insert into aqdemo_queue2_message_t (message) values (o_payload.message); commit; exception when others then rollback; end; / begin -- Register the procedure for dequeuing the messages received. -- I'd like to point out that the subscriber is the one defined for the local database dbms_aq.register ( sys.aq$_reg_info_list( sys.aq$_reg_info('TEST.AQDEMO_QUEUE2:AQDEMO_QUEUE1_SUBSCRIBER', dbms_aq.namespace_aq, 'plsql://TEST.AQDEMO_QUEUE2_CALLBACK_P', hextoraw('FF')) ), 1); end; /
The last step is to run this script which enqueue 100 test messages in the LOCAL database:
declare enq_msgid raw(16); eopt dbms_aq.enqueue_options_t; mprop dbms_aq.message_properties_t; begin mprop.priority := 1; for i in 1..100 loop dbms_aq.enqueue(queue_name => 'aqdemo_queue1', enqueue_options => eopt, message_properties => mprop, payload => test_type('This is a remote test message: ' || lpad(i, 3, 0)), msgid => enq_msgid); end loop; commit; end;
Finally, we check the messages received in the REMOTE database running this statement:
select * from aqdemo_queue2_message_t order by received
- Oracle Streams Advanced Queuing User’s Guide 11g Release 2
- Oracle Database PL/SQL Packages and Types Reference 11g Release 2
- Demo Oracle Advanced Queuing (AQ), propagation of messages between different databases