背景
这里假设一个简单的需求, 某单位有三种角色, Level1, Level2, Level3, 消息也分三种类型, p1(正常), p2(紧急), pn(机密). 不同角色仅仅关心某些信息,关系矩阵如下
----- | p1 | p2 | pn |
---|---|---|---|
level1 | √ | √ | - |
level2 | √ | - | |
level3 | √ |
下面写了几个demo来实现这个需求, 流程如下, 很简单
建立消息载体, 这里使用object(aq_admin.aq_msg_typ)
建立queue table和queue
建立 subscriber (即consumer)
Enqueue, dequeue
1.clean up queue if exists(可选) aq_admin login
--删除队列
declare
l_qt varchar2(30):= 'aq_msg_qtab4';
l_q varchar2(30):='aq_msg_queue4';
begin
dbms_aqadm.stop_queue(queue_name => l_q);
dbms_aqadm.drop_queue(queue_name => l_q);
dbms_aqadm.drop_queue_table(queue_table => l_qt);
end;
2.創建隊列及隊列表且启动之 aq_admin login
begin
--Create a table for queues
dbms_aqadm.create_queue_table(queue_table=>'aq_msg_qtab4'
,queue_payload_type=>'AQ_ADMIN.AQ_MSG_TYPE'
,multiple_consumers => true --多消费者
,comment => 'queue for aq_msg_queue4'
);
--Create a test queue
dbms_aqadm.create_queue(
queue_name => 'aq_msg_queue4'
,queue_table => 'aq_msg_qtab4'
,queue_type => sys.dbms_aqadm.normal_queue
,max_retries => 3 --dequeue失败后重试次数
,retry_delay => 1 --重试前等待
,retention_time => 0 --dequeue后保持时间,不保持
);
--Start the queue
dbms_aqadm.start_queue (queue_name => 'aq_msg_queue4');
end;
/
begin
dbms_aqadm.grant_queue_privilege (privilege => 'ALL',queue_name => 'AQ_ADMIN.AQ_MSG_QUEUE4',grantee => 'AQ_USER',grant_option => FALSE);
end;
3.根据不同需求建立订阅关系, 实现了背景需求中的矩阵 aq_admin login
--add subscribers
declare
l_q varchar2(30) := 'aq_msg_queue4';
l_level1 sys.aq$_agent := sys.aq$_agent(name => 'level1', address => null, protocol => 0);
l_level2 sys.aq$_agent := sys.aq$_agent(name => 'level2', address => null, protocol => 0);
l_level3 sys.aq$_agent := sys.aq$_agent(name => 'level3', address => null, protocol => 0);
begin
-- remove them
begin
dbms_aqadm.remove_subscriber(queue_name => l_q, subscriber => l_level1);
dbms_aqadm.remove_subscriber(queue_name => l_q, subscriber => l_level2);
dbms_aqadm.remove_subscriber(queue_name => l_q, subscriber => l_level3);
exception
when others then
null;
end;
--level1无条件收取所有消息(仅仅是demo,实际也是应该设置条件)
--levle1 receives all events (not including those with specified recipents)
dbms_aqadm.add_subscriber(queue_name => l_q, subscriber => l_level1);
--ldr需要关心urgent和secret类型的消息, 所以设置了一个基于消息类型内容的规则
--level2 is a rule-based subscriber, who concerns p2 and p3
dbms_aqadm.add_subscriber(queue_name => l_q,
subscriber => l_level2,
rule => 'tab.user_data.msg_level in (''p2'',''p3'')');
--pm也设置了一个rule, 仅仅关心secret
-- pm is a rule-based subscriber, who just concerns SECRET
dbms_aqadm.add_subscriber(queue_name => l_q,
subscriber => l_level3,
rule => 'tab.user_data.msg_level in (''p3'')');
end;
4.生成消息, 并且入队 aq_user login
declare
v_queue_name varchar2(50);
v_enqueue_options dbms_aq.enqueue_options_t;
v_message_properties dbms_aq.message_properties_t;
v_message_handle raw(16);
v_aq_msg aq_admin.aq_msg_type;
l_rcpt_list dbms_aq.aq$_recipient_list_t;
begin
v_queue_name:='aq_admin.aq_msg_queue4';
v_aq_msg := aq_admin.aq_msg_type (11,'-','-','p1','content priority 1',sysdate);
v_enqueue_options.visibility :=dbms_aq.immediate;
dbms_aq.enqueue(
queue_name => v_queue_name
,enqueue_options => v_enqueue_options
,message_properties => v_message_properties
,payload => v_aq_msg
,msgid => v_message_handle);
dbms_output.put_line('encode success,msgid is ' || v_message_handle);
commit;
dbms_lock.sleep(1);
v_aq_msg := aq_admin.aq_msg_type (12,'-','-','p2','content priority 2',sysdate);
v_enqueue_options.visibility :=dbms_aq.immediate;
dbms_aq.enqueue(
queue_name => v_queue_name
,enqueue_options => v_enqueue_options
,message_properties => v_message_properties
,payload => v_aq_msg
,msgid => v_message_handle);
dbms_output.put_line('encode success,msgid is ' || v_message_handle);
commit;
dbms_lock.sleep(1);
--这里有个特殊处理: 因为level1没有设置条件,所以level1可以收取pn消息. 且level2也可以收取pn
--但是我只想level3收到此消息,所以这消息入队的时候指定了recipient
-- top pn event only visible to level3, by using speicified recipients
v_aq_msg := aq_admin.aq_msg_type (13,'-','-','p3','content priority 3',sysdate);
l_rcpt_list(0) := sys.aq$_agent(name => 'level3', address => null, protocol => 0);
v_message_properties.recipient_list := l_rcpt_list;
v_enqueue_options.visibility :=dbms_aq.immediate;
dbms_aq.enqueue(
queue_name => v_queue_name
,enqueue_options => v_enqueue_options
,message_properties => v_message_properties
,payload => v_aq_msg
,msgid => v_message_handle);
dbms_output.put_line('encode success,msgid is ' || v_message_handle);
commit;
end;
5.收取一个指定Consumer的所有消息. 这是一个主动获取的过程(Pull), 订阅者并没有得到通知 (aq_user login)
declare
v_queue_name varchar2(50) :='aq_admin.aq_msg_queue4';
v_aq_msg aq_admin.aq_msg_type;
v_message_handle RAW(16);
v_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
v_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
l_consumer varchar2(30) := 'level3';
l_more boolean := true;
no_messages exception;
pragma exception_init(no_messages, -25228);
begin
v_dequeue_options.consumer_name := l_consumer ;
v_dequeue_options.navigation := dbms_aq.first_message;
v_dequeue_options.wait := dbms_aq.no_wait;
while (l_more) loop
begin
dbms_aq.dequeue(queue_name => v_queue_name,dequeue_options => v_dequeue_options,message_properties => v_message_properties,payload => v_aq_msg,msgid => v_message_handle);
commit;
dbms_output.put_line('--------------------------------------- ');
dbms_output.put_line('msg_seq :' || v_aq_msg.msg_seq);
dbms_output.put_line('msg_sender :' || v_aq_msg.msg_sender);
dbms_output.put_line('msg_receiver :' || v_aq_msg.msg_receiver);
dbms_output.put_line('msg_level :' || v_aq_msg.msg_level);
v_dequeue_options.navigation := dbms_aq.next_message;
exception when no_messages then
l_more := false;
end;
end loop;
end;
----------------输出如下-------------------
msg_seq :13
msg_sender :-
msg_receiver :-
msg_level :p3
select count(1) from aq_msg_qtab4
-------------------------------------------------
2
6.清空消息队列(aq_admin login)
DECLARE
v_options sys.dbms_aqadm.aq$_purge_options_t;
l_qt varchar2(30):= 'aq_msg_qtab4';
l_q varchar2(30):='aq_msg_queue4';
BEGIN
SYS.DBMS_AQADM.STOP_QUEUE(QUEUE_NAME => l_q);
dbms_aqadm.purge_queue_table(l_qt, NULL, v_options);
SYS.DBMS_AQADM.START_QUEUE(QUEUE_NAME => l_q);
END;
7.修改回调过程(aq_user login)
增加如下 3 个 过程
AQ_USER.CALL_BACK_PCK.AQ_MSG_QUEUE41
AQ_USER.CALL_BACK_PCK.AQ_MSG_QUEUE42
AQ_USER.CALL_BACK_PCK.AQ_MSG_QUEUE43
8.注册回调 aq_admin login
begin
-- Register the procedure for dequeuing the messages received.
dbms_aq.register(
sys.aq$_reg_info_list(
sys.aq$_reg_info('AQ_ADMIN.AQ_MSG_QUEUE4:LEVEL1',dbms_aq.namespace_aq,
'plsql://AQ_USER.CALL_BACK_PCK.AQ_MSG_QUEUE41',hextoraw('FF'))
),
1);
dbms_aq.register(
sys.aq$_reg_info_list(
sys.aq$_reg_info('AQ_ADMIN.AQ_MSG_QUEUE4:LEVEL2',dbms_aq.namespace_aq,
'plsql://AQ_USER.CALL_BACK_PCK.AQ_MSG_QUEUE42',hextoraw('FF'))
),
1);
dbms_aq.register(
sys.aq$_reg_info_list(
sys.aq$_reg_info('AQ_ADMIN.AQ_MSG_QUEUE4:LEVEL3',dbms_aq.namespace_aq,
'plsql://AQ_USER.CALL_BACK_PCK.AQ_MSG_QUEUE43',hextoraw('FF'))
),
1);
end;
9.入队测试(aq_user)
重复上面的步骤4
10.验证数据
select t.aq_msg_seq,t.queue_name,t.consumer_name,t.content.msg_content
from aq_msg_received t order by 3,1
---------------------------------------------------------------------------------------------------
AQ_MSG_SEQ QUEUE_NAME CONSUMER_NAME CONTENT.MSG_CONTENT
8 "AQ_ADMIN"."AQ_MSG_QUEUE4" aq_msg_queue41->LEVEL1 content priority 1
10 "AQ_ADMIN"."AQ_MSG_QUEUE4" aq_msg_queue41->LEVEL1 content priority 2
9 "AQ_ADMIN"."AQ_MSG_QUEUE4" aq_msg_queue42->LEVEL2 content priority 2
11 "AQ_ADMIN"."AQ_MSG_QUEUE4" aq_msg_queue43->LEVEL3 content priority 3
网友评论