1.定義發送的對象類型(aq_msg_type ) aq_admin login
-- Create the type for the messages payload.
create or replace type aq_admin.aq_msg_type as object
(
msg_seq number(10),
msg_sender varchar2(10),
msg_receiver varchar2(10),
msg_level varchar2(10),
msg_content varchar2(100),
msg_create_time date
);
GRANT EXECUTE ON aq_admin.aq_msg_type to aq_user;
2.創建隊列及隊列表且启动之 aq_admin login
begin
--Create a table for queues
dbms_aqadm.create_queue_table (queue_table => 'aq_msg_qtab', queue_payload_type => 'AQ_ADMIN.AQ_MSG_TYPE');
--Create a test queue
dbms_aqadm.create_queue (queue_name => 'aq_msg_queue1', queue_table => 'aq_msg_qtab');
--Start the queue
dbms_aqadm.start_queue (queue_name => 'aq_msg_queue1');
end;
/
begin
dbms_aqadm.grant_queue_privilege (privilege => 'ALL',queue_name => 'AQ_ADMIN.AQ_MSG_QUEUE1',grantee => 'AQ_USER',grant_option => FALSE);
end;
3.入队 aq_user login
select count(1) from aq_admin.aq_msg_qtab;
-------------------------------------------------------
0
declare
v_queue_name varchar2(50);
v_enqueue_options dbms_aq.enqueue_options_t;
v_message_properties dbms_aq.message_properties_t;
v_message_handle raw(16);
v_aq_msg aq_admin.aq_msg_type;
begin
v_queue_name:='aq_admin.aq_msg_queue1';
v_aq_msg := aq_admin.aq_msg_type (1,'none','none','a','content 1',sysdate);
dbms_aq.enqueue(queue_name => v_queue_name,enqueue_options => v_enqueue_options,message_properties => v_message_properties,payload => v_aq_msg,msgid => v_message_handle);
commit;
end;
select count(1) from aq_admin.aq_msg_qtab;
-------------------------------------------------------
1
- 手工--浏览/出队 aq_user login
declare
v_queue_name varchar2(50);
v_dequeue_options dbms_aq.dequeue_options_t;
v_message_properties dbms_aq.message_properties_t;
v_message_handle raw(16);
v_aq_msg aq_admin.aq_msg_type;
begin
v_queue_name:='aq_admin.aq_msg_queue1';
--如下 dbms_aq.browse 表示 浏览,无下行配置则出队列
v_dequeue_options.dequeue_mode := dbms_aq.browse;
dbms_aq.dequeue(queue_name => v_queue_name,dequeue_options => v_dequeue_options,message_properties => v_message_properties,payload => v_aq_msg,msgid => v_message_handle);
dbms_output.put_line('--------------------------------------- ');
dbms_output.put_line('msg_seq :' || v_aq_msg.msg_seq);
dbms_output.put_line('msg_sender :' || v_aq_msg.msg_sender);
dbms_output.put_line('msg_receiver :' || v_aq_msg.msg_receiver);
dbms_output.put_line('msg_level :' || v_aq_msg.msg_level);
dbms_output.put_line('msg_content :' || v_aq_msg.msg_content);
dbms_output.put_line('msg_create_time :' || to_char(v_aq_msg.msg_create_time,'yyyy-mm-dd hh24:mi:ss'));
dbms_output.put_line('current_time :' || to_char(v_aq_msg.msg_create_time,'yyyy-mm-dd hh24:mi:ss'));
commit;
end;
---------------------------------------
msg_seq :1
msg_sender :none
msg_receiver :none
msg_level :a
msg_content :content 1
msg_create_time :2020-11-08 00:02:54
current_time :2020-11-08 00:02:54
select count(1) from aq_admin.aq_msg_qtab;
-------------------------------------------------------
1
注释掉 “dbms_aq.browse” 所在的行后开始出队列操作后
select count(1) from aq_admin.aq_msg_qtab;
-------------------------------------------------------
0
- 自动--通知出队(aq_user login)
创建测试用的接收表
create sequence aq_msg_seq;
create table aq_msg_received
(
aq_msg_seq number,
queue_name varchar2(100),
received_time date,
content aq_admin.aq_msg_type,
consumer_name varchar2(512)
);
创建回调的包
create or replace package call_back_pck
as
procedure aq_msg_queue1
(
context raw,
reginfo sys.aq$_reg_info,
descr sys.aq$_descriptor,
payload raw,
payloadl number
);
end call_back_pck;
/
create or replace package body call_back_pck
as
procedure aq_msg_queue1
(
context raw,
reginfo sys.aq$_reg_info,
descr sys.aq$_descriptor,
payload raw,
payloadl number
)
is
r_dequeue_options dbms_aq.dequeue_options_t;
r_message_properties dbms_aq.message_properties_t;
v_message_handle raw(26);
o_payload aq_admin.aq_msg_type;
begin
r_dequeue_options.msgid := descr.msg_id;
r_dequeue_options.consumer_name := descr.consumer_name;
dbms_aq.dequeue(queue_name => descr.queue_name,dequeue_options => r_dequeue_options,message_properties => r_message_properties,payload => o_payload,msgid => v_message_handle);
insert into aq_msg_received(aq_msg_seq,queue_name,received_time,content,consumer_name)
values (aq_msg_seq.nextval,descr.queue_name,sysdate,o_payload,descr.consumer_name);
commit;
exception
when others then
rollback;
end aq_msg_queue1;
end call_back_pck;
/
grant execute on call_back_pck to aq_admin;
6.增加回调注册
-- AQ_ADMIN login
begin
-- Register the procedure for dequeuing the messages received.
-- No subscriber is needed
dbms_aq.register
(
sys.aq$_reg_info_list(sys.aq$_reg_info('AQ_ADMIN.AQ_MSG_QUEUE1',dbms_aq.namespace_aq,'plsql://AQ_USER.CALL_BACK_PCK.AQ_MSG_QUEUE1',hextoraw('FF'))),1
);
end;
/
7.入队测试
declare
v_queue_name varchar2(50);
v_enqueue_options dbms_aq.enqueue_options_t;
v_message_properties dbms_aq.message_properties_t;
v_message_handle raw(16);
v_aq_msg aq_admin.aq_msg_type;
begin
v_queue_name:='aq_admin.aq_msg_queue1';
v_aq_msg := aq_admin.aq_msg_type (2,'none','none','a','content 2',sysdate);
dbms_aq.enqueue(queue_name => v_queue_name,enqueue_options => v_enqueue_options,message_properties => v_message_properties,payload => v_aq_msg,msgid => v_message_handle);
commit;
end;
8.验证数据
select t.aq_msg_seq,t.queue_name,t.received_time,t.consumer_name,t.content.msg_content,t.content.msg_create_time
from aq_msg_received t
---------------------------------------------------------------
AQ_MSG_SEQ QUEUE_NAME RECEIVED_TIME CONSUMER_NAME CONTENT.MSG_CONTENT CONTENT.MSG_CREATE_TIME
1 1 "AQ_ADMIN"."AQ_MSG_QUEUE1" 2020/11/9 20:31:44 content 2 2020/11/9 20:31:44
网友评论