Propagation of Oracle AQ messages between databases

I’ve been working on the Oracle Advanced Queueing feature last week, in order to use it in a interface between two applications that run on different databases, so I’d like to transmit my knowledge about this issue, showing a basic example about how to propagate messages between databases, because sometimes you have all that you need in a reference manual but it’s difficult to put all together.

First of all, I’d like to point out that I have used an Oracle 11g database, called LOCAL for this demo, and an Oracle 10g database, called REMOTE, so messages will be propagated from LOCAL to REMOTE. Each of them has a user called TEST, which has the role AQ_ADMINISTRATOR_ROLE granted. On the other hand, I setup a databases link between the two databases called TESTAQ.

The first step is to run this script in the LOCAL database (you can download all the scripts of this demo following this link):

-- Create the type for the payload. It must have the same structure that the one created
--    in the other database. Be careful with the character sets of both databases.
create or replace type test_type as object (message CLOB);
/
begin
  -- Create a table for queues of the type defined before.
  dbms_aqadm.create_queue_table (queue_table => 'aqdemo_queue1_t', 
                                 queue_payload_type => 'TEST.TEST_TYPE', 
                                 multiple_consumers => true);
  -- Create the test queue, which will receive the messages to be propagated.
  dbms_aqadm.create_queue (queue_name => 'aqdemo_queue1', 
                           queue_table => 'aqdemo_queue1_t');
  -- Start the queue for enqueuing and dequeuing messages.
  dbms_aqadm.start_queue (queue_name => 'aqdemo_queue1');
  -- Add the remote subscriber.
  dbms_aqadm.add_subscriber (queue_name => 'aqdemo_queue1', 
                             subscriber => sys.aq$_agent(name => 'aqdemo_queue1_subscriber', 
                                                         address => 'AQDEMO_QUEUE2@testaq', 
                                                         protocol => 0 ), 
                             queue_to_queue => true);
  -- Start the propagation of messages.
  dbms_aqadm.schedule_propagation(queue_name => 'TEST.AQDEMO_QUEUE1', 
                                  latency => 0, 
                                  destination => 'testaq', 
                                  destination_queue => 'TEST.AQDEMO_QUEUE2');
end;
/

The following step is to run this script in the REMOTE database:

create or replace type test_type as object (message CLOB);
/
begin
  -- Create a table for queues of the type defined before.
  dbms_aqadm.create_queue_table (queue_table => 'aqdemo_queue2_t', 
                                 queue_payload_type => 'TEST.TEST_TYPE', 
                                 multiple_consumers => true );
  -- Create the test queue, which will receive the messages from the queue on the LOCAL database.
  dbms_aqadm.create_queue (queue_name => 'aqdemo_queue2', 
                           queue_table => 'aqdemo_queue2_t');
  -- Start the queue for enqueuing and dequeuing messages.
  dbms_aqadm.start_queue (queue_name => 'aqdemo_queue2');
end;
/
-- Create a table to store the messages received.
create table aqdemo_queue2_message_t
  (received timestamp default systimestamp,
   message CLOB);
-- Create a callback procedure that dequeues the received message and saves it
create or replace
procedure aqdemo_queue2_callback_p
  (
    context raw,
    reginfo sys.aq$_reg_info,
    descr sys.aq$_descriptor,
    payload raw,
    payloadl number
  )
as
  r_dequeue_options dbms_aq.dequeue_options_t;
  r_message_properties dbms_aq.message_properties_t;
  v_message_handle raw(26);
  o_payload test_type;
begin
  r_dequeue_options.msgid         := descr.msg_id;
  r_dequeue_options.consumer_name := descr.consumer_name;
  dbms_aq.dequeue(queue_name => descr.queue_name, 
                  dequeue_options => r_dequeue_options, 
                  message_properties => r_message_properties, 
                  payload => o_payload, 
                  msgid => v_message_handle);
  insert into aqdemo_queue2_message_t 
    (message) 
    values (o_payload.message);
  commit;
exception
  when others then
    rollback;
end;
/
begin
  -- Register the procedure for dequeuing the messages received.
  -- I'd like to point out that the subscriber is the one defined for the local database
  dbms_aq.register (
     sys.aq$_reg_info_list(
        sys.aq$_reg_info('TEST.AQDEMO_QUEUE2:AQDEMO_QUEUE1_SUBSCRIBER', 
                         dbms_aq.namespace_aq, 'plsql://TEST.AQDEMO_QUEUE2_CALLBACK_P', 
                         hextoraw('FF'))
                        ), 
        1);
end;
/

The last step is to run this script which enqueue 100 test messages in the LOCAL database:

declare
  enq_msgid raw(16);
  eopt dbms_aq.enqueue_options_t;
  mprop dbms_aq.message_properties_t;
begin
  mprop.priority := 1;
  for i in 1..100
  loop
    dbms_aq.enqueue(queue_name => 'aqdemo_queue1', 
                    enqueue_options => eopt, 
                    message_properties => mprop, 
                    payload => test_type('This is a remote test message: ' || 
                                         lpad(i, 3, 0)), 
                    msgid => enq_msgid);
  end loop;
  commit;
end;

Finally, we check the messages received in the REMOTE database running this statement:

select *
from aqdemo_queue2_message_t
order by received

References


Advertisements

16 Comments on “Propagation of Oracle AQ messages between databases”

  1. patrice says:

    Hello Paco,

    Fantastic !! I have been pulling my hair out until now to discover that the callback uses the sender’s subscriber name ?????

    Where did you ever find this info ?

    Regardless, it works.

    Thanks a lot
    Patrice

    • fcosfc says:

      Hi Patrice,

      I’m happy to know I helped you. I reviewed the manuals referenced by this post and tried it until I succeed.

      Regards,

      Paco.

  2. Perer says:

    This is good information! Anyway, I have an issue where I am unable to propagate from source to multiple Virtual Databases (Delphix VDB). After 16 tries, the processes are stopped. I have looked in Oracle forum, looked in alert logs, and nothing. This was working and then just stopped when we added another virtual db. Have followed the master note from oracle to troubleshoot and still no success. Recreated the dblinks – which work. Verified using select to and from remote db.

    • fcosfc says:

      Hi Perer,

      I’m delighted you consider my post as good information. On the other hand, I don’t know anything about Delphix VDB, so I can’t help you, I’m sorry.

      Regards,

      Paco.

  3. srikanth says:

    Hi Paco,

    I have question …

    I understand we can remove the message from the REMOTE queue table/queue once we dequue the message. But how do we remove the message from the queue table in LOCAL database.

    I do not want the messages to be Piled up in the LOCAL database queue and I know that we can explicitly delete them, but I also want to make sure that the messages has been enquued in the REMOTE queue before deleting the LOCAL message.

    Any ideas?

    Thanks,
    Sri

    • fcosfc says:

      Hi Sri,

      Messages should be deleted from the LOCAL database when they are propagated to the REMOTE one, even before they are read by a remote subscriber.

      Regards,

      Paco.

      • sri says:

        Hi Paco,

        Thanks for the reply..
        As my requirement is a kind of an integration in pushing the data from LOCAL database to REMOTE database, and if I use the AQ functionality in creating the messages in LOCAL database queue and get is propagated to REMOTE database queue . There is a separate process to dequeue the messages from the REMOTE database queue.

        If the message in the LOCAL database queue is not automatically removed, and if we need to manually delete the message then I have couple of questions…

        1) Do the propagation of the message in REMOTE queue happens immediately as soon as we put the message in LOCAL queue?
        Because as soon I put the message in the LOCAL queue, I need to remove if there is no automatic way of deleting it…

        2) If we need to delete manually, then how can we make sure that the message is available at REMOTE queue?

        Thanks,
        Sri

  4. fcosfc says:

    Hi Sri,

    I’m sorry, I didn’t express myself properly, the database deletes the messages automatically when they are propagated. I put “messages should be deleted” because your system could have an error, but the normal behaviour is that messages are deleted automatically.

    Regards,

    Paco

  5. Stefan says:

    Hi Paco,

    sorry to reopen it after three years.

    I’m trying something similar and came up with your solution (couldn’t find a difference at least). Nevertheless it doesn’t seem to be what it should be:
    The USER function returns SYS instead of TEST. Did you notice something similar or do I miss something?

    Regards,
    Stefan

    • fcosfc says:

      Hi Stefan,

      Nothing to sorry about.

      To be perfectly honest, I don’t understand your problem. Please, could you be so kind as of detailing it a bit more?

      Regards,

      Paco.

      • Stefan says:

        Hi Paco,

        you are right. I was chasing ghosts!

        Let’s start at the beginning.

        There is an user LOKAL who owns QueueLokal and the Database Link.
        There is an user REMOTE who is adressed by the Database Link and owns QueueRemote.
        User REMOTE is owner of the CALLBACK-procedure.

        In my complex testcase I’m using a View with Oracle USER-function filtering rows.
        A call to “USER” in the CALLBACK-procedure suprised me with “SYS” (instead of “REMOTE”).

        I thought I had SYS-rights.

        Your comment “don’t understand your problem” made me re-think it:

        * USER returns “SYS”
        * You are accessing the QNS-tables (see Example “WhoAmIAndWhatDoIAccess()”)

        The question remains how to achieve “QNS” from USER or why “SYS” is obvious.

        Example:
        ( The user “REMOTE” reads “QNS” here
        ; AUTHID CURRENT_USER is toggled on/off defaulting to AUHID DEFINER
        ; Queueevent_CallBack() is registered with the REMOTE-Queue)

        CREATE TABLE qns.DebugInfo( dt timestamp, text VARCHAR2(2000) )
        /

        CREATE OR REPLACE PACKAGE qns.TestUserAndAccessRight
        AUTHID CURRENT_USER
        IS
        PROCEDURE WhoAmIAndWhatDoIAccess;
        PROCEDURE Queueevent_CallBack( context RAW, reginfo sys.aq$_reg_info, descr sys.aq$_descriptor, payload RAW, payloadl NUMBER );
        END TestUserAndAccessRight;
        /

        CREATE OR REPLACE PACKAGE BODY qns.TestUserAndAccessRight
        IS

        PROCEDURE WhoAmIAndWhatDoIAccess
        IS PRAGMA AUTONOMOUS_TRANSACTION;
        BEGIN
        INSERT INTO QNS.DebugInfo( dt, text )
        WITH userTab AS
        ( SELECT DISTINCT first_value(table_name)OVER(ORDER BY num_rows DESC NULLS LAST, table_name)tab, nvl(first_value(num_rows)OVER(ORDER BY num_rows DESC NULLS LAST, table_name),-1)num FROM user_tables u )
        SELECT DISTINCT SYSTIMESTAMP, USER||’:’||a.Owner||’.’||tab
        FROM userTab u
        INNER JOIN all_tables a ON a.table_name=u.Tab AND nvl(a.NUM_ROWS,-1)=u.NUM
        ;
        COMMIT;
        END WhoAmIAndWhatDoIAccess;

        PROCEDURE Queueevent_CallBack( context RAW, reginfo sys.aq$_reg_info, descr sys.aq$_descriptor, payload RAW, payloadl NUMBER )
        IS
        deq_options dbms_aq.dequeue_options_t;
        msg_properties dbms_aq.message_properties_t;
        msg_handle RAW(16);
        msg QNS.IOMELDUNG_QNS;
        BEGIN
        deq_options.msgid := descr.msg_id;
        deq_options.consumer_name := descr.consumer_name;
        dbms_aq.DEQUEUE( descr.queue_name, deq_options, msg_properties, msg, msg_handle )
        ;
        WhoAmIAndWhatDoIAccess;
        EXCEPTION
        WHEN OTHERS
        THEN ROLLBACK;
        END Queueevent_CallBack;

        END TestUserAndAccessRight;
        /

        Results with/without “AUTHID CURRENT_USER”

        => “SYS:QNS.TESTINFORMATION”

        Regards
        Stefan

  6. Suresh Kumar says:

    I have been trying this AQ propagation process since 1-2 weeks… I can see that message from local to remote queue propagages… but it doesn’t get dequeued from remote queue and I dont see any message into my message_table… i have written everything in a package and executed all the procedures.

    the callback and register procedures are as follows:

    PROCEDURE receiverCallback(a_context RAW,
    a_reginfo SYS.AQ$_REG_INFO,
    a_descr SYS.AQ$_DESCRIPTOR,
    a_payload RAW,
    a_payloadl NUMBER)
    AS
    b_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
    b_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
    b_message_handle RAW(26);
    b_payload propmsg;
    BEGIN
    b_dequeue_options.msgid := a_descr.msg_id;
    b_dequeue_options.consumer_name := a_descr.consumer_name;
    DBMS_AQ.DEQUEUE(queue_name => a_descr.queue_name,
    dequeue_options => b_dequeue_options,
    message_properties => b_message_properties,
    payload => b_payload,
    msgid => b_message_handle);
    INSERT INTO receiver_message (message,received) VALUES(b_payload.message,SYSDATE);
    COMMIT;
    EXCEPTION
    WHEN OTHERS THEN
    –ROLLBACK;
    INSERT INTO receiver_message (message,received) VALUES(‘Error !!!’,SYSDATE);
    commit;
    END receiverCallback;

    /***************************************************************************************
    * Register the procedure for dequeuing the messages received.
    * The subscriber is the one defined for the local database.
    ***************************************************************************************/

    PROCEDURE registerSubscriber(a_user all_users.username%TYPE,
    a_queue user_queues.name%TYPE,
    a_subscriber VARCHAR2,
    a_callbackProc VARCHAR2)
    AS
    b_regInfo SYS.AQ$_REG_INFO;
    b_regList SYS.AQ$_REG_INFO_LIST;
    BEGIN
    b_regInfo := SYS.AQ$_REG_INFO(a_user||’.’||a_queue||’:’||a_subscriber,
    DBMS_AQ.NAMESPACE_AQ,
    ‘plsql://’||a_user||’.’||a_callbackProc,
    HEXTORAW(‘FF’));
    b_regList := SYS.AQ$_REG_INFO_LIST(b_regInfo);
    DBMS_AQ.REGISTER
    (
    reg_list => b_regList,
    reg_count => 1
    );
    COMMIT;

    DBMS_OUTPUT.PUT_LINE(‘Subscriber Registered: Name=’||b_regInfo.name||’ callback=’||b_regInfo.callback );

    END registerSubscriber;

    ——

    And I execute

    exec aqprop.registerSubscriber(‘VPM’,’REMOTEQ’,’HOSTQSUBSCRIBER’,’AQPROP.RECEIVERCALLBACK’)

    aqprop is the package name. I am not sure if I m missing something here. Please advise.

    • fcosfc says:

      Hi Suresh,

      This configuration is always tricky, because sometimes you don’t get an error, it simply doesn’t work, as it is your case.

      I think that the call to your registering proceduce is wrong, it should be:

      aqprop.registerSubscriber(‘VPM’,’REMOTEQ’,’HOSTQSUBSCRIBER’,
      ’VPM.AQPROP.RECEIVERCALLBACK’);

      Please, notice that the username should precede the package name.

      Another thing you should review is the type of the payload in the callback procedure sign: varchar or raw.

      Finally, I’d like to point out that I recently wrote about this issue: https://fcosfc.wordpress.com/2014/12/17/oracle-aq-working-with-plsql-asynchronous-notifications/

      Regards,

      Paco.

  7. Paco,
    This is great. I compared my code to yours and found what I thought was the missing link. When registering the callback program in the remote DB, I was not using the local DB’s subscriber. So I dropped everything, started all over, registered the callback in the remote DB with the local’s subscriber. Still, the callback program is not being invoked. My messages are replicated to the remote queue, but are not getting dequeued. I am ready to give up on this and create my own method to send messages from local to remote.
    Is there anything else you can think of to look at?
    I found a row in DBA_JOBS in the local DB that manages the propagation. But I do not see a row in the remote db version of DBA_JOBS. Is there something that is supposed to be running in the remote DB to fire off the notifications? Do you know where to look to see if the Dbms_Aq.Register in the remote DB actually worked?
    Thank you for your efforts to help others out.

    • fcosfc says:

      Hi Barry,

      To be perfectly honest, I don’t know if a row should be present in the remote DBA_JOBS, I wrote this post almost five years ago and I currently don’t have access to the systems I used to set up this configuration, in order to check out this issue.
      But, rows should be registered in the remote queues table if propagation is working and callback procedure not.
      I suggest you to review two issues:
      # The type of the payload argument for the callback procedure.
      # ¿Is it possible that the callback procedure is throwing any exceptions? In that case, messages should be present in the exception queues.
      Please review my post on asynchronous notifications: https://fcosfc.wordpress.com/2014/12/17/oracle-aq-working-with-plsql-asynchronous-notifications/

      Regards,

      Paco Saucedo.


Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s