Pages

OracleEBSpro is purely for knowledge sharing and learning purpose, with the main focus on Oracle E-Business Suite Product and other related Oracle Technologies.

I'm NOT responsible for any damages in whatever form caused by the usage of the content of this blog.

I share my Oracle knowledge through this blog. All my posts in this blog are based on my experience, reading oracle websites, books, forums and other blogs. I invite people to read and suggest ways to improve this blog.


Friday, April 19, 2013

AQ


Advanced Queuing in Oracle

This article is a basic primer to get you started using Advanced Queuing in Oracle 9i and above.

Environment Setup

Administration and access privileges for advanced queuing are controled using two roles:
  • AQ_ADMINISTRATOR_ROLE - Allows creation and administration of queuing infrastructure.
  • AQ_USER_ROLE - Allows access to queues for enqueue and dequeue operations.
In the following examples I have used two schemas, one owning the queuing infrastructure and another with access to it.
CONNECT / AS SYSDBA
CREATE USER aq_admin IDENTIFIED BY aq_admin DEFAULT TABLESPACE users;
GRANT connect TO aq_admin;
GRANT create type TO aq_admin;
GRANT aq_administrator_role TO aq_admin;
ALTER USER aq_admin QUOTA UNLIMITED ON users;

CREATE USER aq_user IDENTIFIED BY aq_user DEFAULT TABLESPACE users;
GRANT connect TO aq_user;
GRANT aq_user_role TO aq_user;

Define Payload

The content, or payload, of a message is defined using an object type which must be defined before a queue is created.
CONNECT aq_admin/aq_admin

CREATE OR REPLACE TYPE event_msg_type AS OBJECT (
  name            VARCHAR2(10),
  current_status  NUMBER(5),
  next_status     NUMBER(5)
);
/

GRANT EXECUTE ON event_msg_type TO aq_user;

Create Queue Table And Queue

Once the payload type is created the queuing infrastructure can be created. Queues are implemented using a queue table which can hold multiple queues with the same payload type. First the queue table must be defined using the payload type, then the queue can be defined and started. These operations are all performed using the DBMS_AQADM package.
CONNECT aq_admin/aq_admin

EXECUTE DBMS_AQADM.create_queue_table ( -
   queue_table            =>  'aq_admin.event_queue_tab', -
   queue_payload_type     =>  'aq_admin.event_msg_type');

EXECUTE DBMS_AQADM.create_queue ( -
   queue_name            =>  'aq_admin.event_queue', -
   queue_table           =>  'aq_admin.event_queue_tab');

EXECUTE DBMS_AQADM.start_queue ( -
   queue_name         => 'aq_admin.event_queue', -
   enqueue            => TRUE);

Grant Privilege On Queue

The DBMS_AQADM package is also used to grant privileges on queues so that other users can access them.
CONNECT aq_admin/aq_admin

EXECUTE DBMS_AQADM.grant_queue_privilege ( -
   privilege     =>     'ALL', -
   queue_name    =>     'aq_admin.event_queue', -
   grantee       =>     'aq_user', -
   grant_option  =>      FALSE);
At this point the queue can be used for enqueue and dequeue operations by the AQ_USER user.

Enqueue Message

Messages can be written to the queue using the DBMS_AQ.ENQUEUE procedure.
CONNECT aq_user/aq_user

DECLARE
  l_enqueue_options     DBMS_AQ.enqueue_options_t;
  l_message_properties  DBMS_AQ.message_properties_t;
  l_message_handle      RAW(16);
  l_event_msg           AQ_ADMIN.event_msg_type;
BEGIN
  l_event_msg := AQ_ADMIN.event_msg_type('REPORTER', 1, 2);

  DBMS_AQ.enqueue(queue_name          => 'aq_admin.event_queue',        
                  enqueue_options     => l_enqueue_options,     
                  message_properties  => l_message_properties,   
                  payload             => l_event_msg,             
                  msgid               => l_message_handle);

  COMMIT;
END;
/

Dequeue Message

Messages can be read from the queue using the DBMS_AQ.DEQUEUE procedure.
CONNECT aq_user/aq_user

SET SERVEROUTPUT ON

DECLARE
  l_dequeue_options     DBMS_AQ.dequeue_options_t;
  l_message_properties  DBMS_AQ.message_properties_t;
  l_message_handle      RAW(16);
  l_event_msg           AQ_ADMIN.event_msg_type;
BEGIN
  DBMS_AQ.dequeue(queue_name          => 'aq_admin.event_queue',
                  dequeue_options     => l_dequeue_options,
                  message_properties  => l_message_properties,
                  payload             => l_event_msg,
                  msgid               => l_message_handle);

  DBMS_OUTPUT.put_line ('Event Name          : ' || l_event_msg.name);
  DBMS_OUTPUT.put_line ('Event Current Status: ' || l_event_msg.current_status);
  DBMS_OUTPUT.put_line ('Event Next Status   : ' || l_event_msg.next_status);
  COMMIT;
END;
/

Variations

The ENQUEUE_OPTIONS_TDEQUEUE_OPTIONS_T and MESSAGE_PROPERTIES_T types can be used to vary the way messages are enqueued and dequeued. This is where the real flexibility of advanced queuing becomes evident. The discussion of these options is beyond the scope of this article but they are discussed in the following links:
For more information see:

Credit goes to the below website(s) :

http://www.oracle-base.com/articles/9i/advanced-queuing-9i.php
----------------------------------------------------------------------------------------------
Database-A
-- Create a type object which will be the payload for queue.
CREATE OR REPLACE TYPE ge_aq_type AS OBJECT (
  customer_id          NUMBER,
  audit_customer_id    NUMBER
);

 --creating queue table
begin
  dbms_aqadm.create_queue_table(queue_table => 'ge_qtbl',
                                queue_payload_type => 'ge_aq_type',
                                multiple_consumers => true);
end;
-- Create Queue
begin
  dbms_aqadm.create_queue(queue_name => 'ge_test_aq',
                          queue_table => 'ge_qtbl');
end;
-- Start Queue
begin
  dbms_aqadm.start_queue(queue_name => 'ge_test_aq');
end;

begin
dbms_aqadm.add_subscriber(queue_name => 'ge_test_aq',
subscriber => sys.aq$_agent('ge_sub','ge_test_aq@sched.rgis.com',null));
end;


-- Schedule propagation
begin
  dbms_aqadm.schedule_propagation(queue_name => 'ge_test_aq',
                                  destination => 'sched.rgis.com',
                                  start_time => sysdate, latency => 1);
end;
/

select *
from dba_queues dq
where dq.name=upper('ge_test_aq');

-------------------------------------------
Database-B


-- Create a type object which will be the payload for queue.

CREATE OR REPLACE TYPE ge_aq_type AS OBJECT (
  customer_id          NUMBER,
  audit_customer_id    NUMBER
);

 --creating queue table
begin
  dbms_aqadm.create_queue_table(queue_table => 'ge_qtbl',
                                queue_payload_type => 'ge_aq_type',
                                multiple_consumers => true);
end;
-- Create Queue
begin
  dbms_aqadm.create_queue(queue_name => 'ge_test_aq',
                          queue_table => 'ge_qtbl');
end;
-- Start Queue
begin
  dbms_aqadm.start_queue(queue_name => 'ge_test_aq');
end;

begin
dbms_aqadm.add_subscriber(queue_name => 'ge_test_aq',
subscriber => sys.aq$_agent('ge_sub',null,null));
end;
/

select *
from dba_queues;

select *
from dba_queue_tables
where queue_table=upper('ge_qtbl');

select *
from ge_qtbl;

-------------------Test the Streams------------------------

Database-A

declare
  l_enqopt    dbms_aq.enqueue_options_t;
  l_msgprop   dbms_aq.message_properties_t;
  l_enq_msgid raw(40);
  l_payload   ge_aq_type;

begin
  l_payload := ge_aq_type(123123, 456456);

  dbms_aq.enqueue(queue_name         => 'ge_test_aq',
                  enqueue_options    => l_enqopt,
                  message_properties => l_msgprop,
                  payload            => l_payload,
                  msgid              => l_enq_msgid);
  dbms_output.put_line('l_enq_msgid = ' || l_enq_msgid);
end;
/

  select * from ge_qtbl;


2 comments:

  1. This comment has been removed by the author.

    ReplyDelete
  2. Thanks For Posting the Information about Oracle.We can Improve Your Knowledge come to Erptree Oracle Training.Oracle Fusion Cloud Financials Training

    ReplyDelete