Advanced Queuing in Oracle
This article is a basic primer to get you started using Advanced Queuing in Oracle 9i and above.
- Environment Setup
- Define Payload
- Create Queue Table And Queue
- Grant Privilege On Queue
- Enqueue Message
- Dequeue Message
- Variations
Environment Setup
Administration and access privileges for advanced queuing are controled using two roles:
AQ_ADMINISTRATOR_ROLE
- Allows creation and administration of queuing infrastructure.AQ_USER_ROLE
- Allows access to queues for enqueue and dequeue operations.
In the following examples I have used two schemas, one owning the queuing infrastructure and another with access to it.
CONNECT / AS SYSDBA CREATE USER aq_admin IDENTIFIED BY aq_admin DEFAULT TABLESPACE users; GRANT connect TO aq_admin; GRANT create type TO aq_admin; GRANT aq_administrator_role TO aq_admin; ALTER USER aq_admin QUOTA UNLIMITED ON users; CREATE USER aq_user IDENTIFIED BY aq_user DEFAULT TABLESPACE users; GRANT connect TO aq_user; GRANT aq_user_role TO aq_user;
Define Payload
The content, or payload, of a message is defined using an object type which must be defined before a queue is created.
CONNECT aq_admin/aq_admin CREATE OR REPLACE TYPE event_msg_type AS OBJECT ( name VARCHAR2(10), current_status NUMBER(5), next_status NUMBER(5) ); / GRANT EXECUTE ON event_msg_type TO aq_user;
Create Queue Table And Queue
Once the payload type is created the queuing infrastructure can be created. Queues are implemented using a queue table which can hold multiple queues with the same payload type. First the queue table must be defined using the payload type, then the queue can be defined and started. These operations are all performed using the
DBMS_AQADM
package.CONNECT aq_admin/aq_admin EXECUTE DBMS_AQADM.create_queue_table ( - queue_table => 'aq_admin.event_queue_tab', - queue_payload_type => 'aq_admin.event_msg_type'); EXECUTE DBMS_AQADM.create_queue ( - queue_name => 'aq_admin.event_queue', - queue_table => 'aq_admin.event_queue_tab'); EXECUTE DBMS_AQADM.start_queue ( - queue_name => 'aq_admin.event_queue', - enqueue => TRUE);
Grant Privilege On Queue
The
DBMS_AQADM
package is also used to grant privileges on queues so that other users can access them.CONNECT aq_admin/aq_admin EXECUTE DBMS_AQADM.grant_queue_privilege ( - privilege => 'ALL', - queue_name => 'aq_admin.event_queue', - grantee => 'aq_user', - grant_option => FALSE);
At this point the queue can be used for enqueue and dequeue operations by the AQ_USER user.
Enqueue Message
Messages can be written to the queue using the
DBMS_AQ.ENQUEUE
procedure.CONNECT aq_user/aq_user DECLARE l_enqueue_options DBMS_AQ.enqueue_options_t; l_message_properties DBMS_AQ.message_properties_t; l_message_handle RAW(16); l_event_msg AQ_ADMIN.event_msg_type; BEGIN l_event_msg := AQ_ADMIN.event_msg_type('REPORTER', 1, 2); DBMS_AQ.enqueue(queue_name => 'aq_admin.event_queue', enqueue_options => l_enqueue_options, message_properties => l_message_properties, payload => l_event_msg, msgid => l_message_handle); COMMIT; END; /
Dequeue Message
Messages can be read from the queue using the
DBMS_AQ.DEQUEUE
procedure.CONNECT aq_user/aq_user SET SERVEROUTPUT ON DECLARE l_dequeue_options DBMS_AQ.dequeue_options_t; l_message_properties DBMS_AQ.message_properties_t; l_message_handle RAW(16); l_event_msg AQ_ADMIN.event_msg_type; BEGIN DBMS_AQ.dequeue(queue_name => 'aq_admin.event_queue', dequeue_options => l_dequeue_options, message_properties => l_message_properties, payload => l_event_msg, msgid => l_message_handle); DBMS_OUTPUT.put_line ('Event Name : ' || l_event_msg.name); DBMS_OUTPUT.put_line ('Event Current Status: ' || l_event_msg.current_status); DBMS_OUTPUT.put_line ('Event Next Status : ' || l_event_msg.next_status); COMMIT; END; /
Variations
The
ENQUEUE_OPTIONS_T
, DEQUEUE_OPTIONS_T
and MESSAGE_PROPERTIES_T
types can be used to vary the way messages are enqueued and dequeued. This is where the real flexibility of advanced queuing becomes evident. The discussion of these options is beyond the scope of this article but they are discussed in the following links:
For more information see:
Credit goes to the below website(s) :
http://www.oracle-base.com/articles/9i/advanced-queuing-9i.php
----------------------------------------------------------------------------------------------
Database-A
-- Create a type object which will be the payload for queue.
CREATE OR REPLACE TYPE ge_aq_type AS OBJECT (
customer_id NUMBER,
audit_customer_id NUMBER
);
--creating queue table
begin
dbms_aqadm.create_queue_table(queue_table => 'ge_qtbl',
queue_payload_type => 'ge_aq_type',
multiple_consumers => true);
end;
-- Create Queue
begin
dbms_aqadm.create_queue(queue_name => 'ge_test_aq',
queue_table => 'ge_qtbl');
end;
-- Start Queue
begin
dbms_aqadm.start_queue(queue_name => 'ge_test_aq');
end;
begin
dbms_aqadm.add_subscriber(queue_name => 'ge_test_aq',
subscriber => sys.aq$_agent('ge_sub','ge_test_aq@sched.rgis.com',null));
end;
-- Schedule propagation
begin
dbms_aqadm.schedule_propagation(queue_name => 'ge_test_aq',
destination => 'sched.rgis.com',
start_time => sysdate, latency => 1);
end;
/
select *
from dba_queues dq
where dq.name=upper('ge_test_aq');
-------------------------------------------
Database-B
-- Create a type object which will be the payload for queue.
CREATE OR REPLACE TYPE ge_aq_type AS OBJECT (
customer_id NUMBER,
audit_customer_id NUMBER
);
--creating queue table
begin
dbms_aqadm.create_queue_table(queue_table => 'ge_qtbl',
queue_payload_type => 'ge_aq_type',
multiple_consumers => true);
end;
-- Create Queue
begin
dbms_aqadm.create_queue(queue_name => 'ge_test_aq',
queue_table => 'ge_qtbl');
end;
-- Start Queue
begin
dbms_aqadm.start_queue(queue_name => 'ge_test_aq');
end;
begin
dbms_aqadm.add_subscriber(queue_name => 'ge_test_aq',
subscriber => sys.aq$_agent('ge_sub',null,null));
end;
/
select *
from dba_queues;
select *
from dba_queue_tables
where queue_table=upper('ge_qtbl');
select *
from ge_qtbl;
-------------------Test the Streams------------------------
Database-A
declare
l_enqopt dbms_aq.enqueue_options_t;
l_msgprop dbms_aq.message_properties_t;
l_enq_msgid raw(40);
l_payload ge_aq_type;
begin
l_payload := ge_aq_type(123123, 456456);
dbms_aq.enqueue(queue_name => 'ge_test_aq',
enqueue_options => l_enqopt,
message_properties => l_msgprop,
payload => l_payload,
msgid => l_enq_msgid);
dbms_output.put_line('l_enq_msgid = ' || l_enq_msgid);
end;
/
select * from ge_qtbl;
This comment has been removed by the author.
ReplyDeleteThanks For Posting the Information about Oracle.We can Improve Your Knowledge come to Erptree Oracle Training.Oracle Fusion Cloud Financials Training
ReplyDelete