美文网首页
Oracle Advanced Queuing 之05(多订阅人

Oracle Advanced Queuing 之05(多订阅人

作者: 轻飘飘D | 来源:发表于2020-11-14 10:58 被阅读0次

    背景
    这里假设一个简单的需求, 某单位有三种角色, Level1, Level2, Level3, 消息也分三种类型, p1(正常), p2(紧急), pn(机密). 不同角色仅仅关心某些信息,关系矩阵如下

    ----- p1 p2 pn
    level1 -
    level2 -
    level3

    下面写了几个demo来实现这个需求, 流程如下, 很简单
    建立消息载体, 这里使用object(aq_admin.aq_msg_typ)
    建立queue table和queue
    建立 subscriber (即consumer)
    Enqueue, dequeue

    1.clean up queue if exists(可选) aq_admin login

    --删除队列
    declare
      l_qt varchar2(30):= 'aq_msg_qtab4';
      l_q varchar2(30):='aq_msg_queue4';
    begin
      dbms_aqadm.stop_queue(queue_name => l_q);
      dbms_aqadm.drop_queue(queue_name => l_q);
      dbms_aqadm.drop_queue_table(queue_table => l_qt);
    end;
    

    2.創建隊列及隊列表且启动之 aq_admin login

    begin
    --Create a table for queues 
    dbms_aqadm.create_queue_table(queue_table=>'aq_msg_qtab4'
    ,queue_payload_type=>'AQ_ADMIN.AQ_MSG_TYPE'
    ,multiple_consumers => true  --多消费者
    ,comment => 'queue for aq_msg_queue4'
    );
    
      --Create a test queue
      dbms_aqadm.create_queue(
       queue_name  => 'aq_msg_queue4'
      ,queue_table => 'aq_msg_qtab4'
      ,queue_type => sys.dbms_aqadm.normal_queue
      ,max_retries => 3 --dequeue失败后重试次数
      ,retry_delay => 1 --重试前等待
      ,retention_time => 0 --dequeue后保持时间,不保持
    );
                               
      --Start the queue                           
      dbms_aqadm.start_queue (queue_name => 'aq_msg_queue4');      
    end;
    /
    
    begin
     dbms_aqadm.grant_queue_privilege (privilege => 'ALL',queue_name => 'AQ_ADMIN.AQ_MSG_QUEUE4',grantee => 'AQ_USER',grant_option => FALSE);
    end;
    

    3.根据不同需求建立订阅关系, 实现了背景需求中的矩阵 aq_admin login

    --add subscribers
    declare
      l_q   varchar2(30) := 'aq_msg_queue4';
      l_level1 sys.aq$_agent := sys.aq$_agent(name => 'level1', address  => null, protocol => 0);
      l_level2 sys.aq$_agent := sys.aq$_agent(name => 'level2', address  => null, protocol => 0);
      l_level3 sys.aq$_agent := sys.aq$_agent(name => 'level3', address  => null, protocol => 0);                            
    begin
      -- remove them
      begin
        dbms_aqadm.remove_subscriber(queue_name => l_q, subscriber => l_level1);
        dbms_aqadm.remove_subscriber(queue_name => l_q, subscriber => l_level2);
        dbms_aqadm.remove_subscriber(queue_name => l_q, subscriber => l_level3);
      exception
        when others then
          null;
      end;
     
      --level1无条件收取所有消息(仅仅是demo,实际也是应该设置条件) 
      --levle1 receives all events (not including those with specified recipents)
      dbms_aqadm.add_subscriber(queue_name => l_q, subscriber => l_level1);
     
      --ldr需要关心urgent和secret类型的消息, 所以设置了一个基于消息类型内容的规则
      --level2 is a rule-based subscriber, who concerns p2 and p3
      dbms_aqadm.add_subscriber(queue_name => l_q,
                                subscriber => l_level2,
                                rule       => 'tab.user_data.msg_level in (''p2'',''p3'')');
     
      --pm也设置了一个rule, 仅仅关心secret
      -- pm is a rule-based subscriber, who just concerns SECRET
      dbms_aqadm.add_subscriber(queue_name => l_q,
                                subscriber => l_level3,
                                rule       => 'tab.user_data.msg_level in (''p3'')');
    end;
    

    4.生成消息, 并且入队 aq_user login

    declare
      v_queue_name varchar2(50);
      v_enqueue_options     dbms_aq.enqueue_options_t;
      v_message_properties  dbms_aq.message_properties_t;
      v_message_handle      raw(16);
      v_aq_msg              aq_admin.aq_msg_type;
      l_rcpt_list           dbms_aq.aq$_recipient_list_t;
    begin
      v_queue_name:='aq_admin.aq_msg_queue4';
      v_aq_msg := aq_admin.aq_msg_type (11,'-','-','p1','content priority 1',sysdate);
      v_enqueue_options.visibility :=dbms_aq.immediate;
      dbms_aq.enqueue(
          queue_name => v_queue_name
          ,enqueue_options => v_enqueue_options
          ,message_properties => v_message_properties
          ,payload => v_aq_msg
          ,msgid => v_message_handle);
      dbms_output.put_line('encode success,msgid is ' || v_message_handle);
      commit;
      
      dbms_lock.sleep(1);
      v_aq_msg := aq_admin.aq_msg_type (12,'-','-','p2','content priority 2',sysdate);
      v_enqueue_options.visibility :=dbms_aq.immediate;
      dbms_aq.enqueue(
          queue_name => v_queue_name
          ,enqueue_options => v_enqueue_options
          ,message_properties => v_message_properties
          ,payload => v_aq_msg
          ,msgid => v_message_handle);
      dbms_output.put_line('encode success,msgid is ' || v_message_handle);
      commit;
      
      dbms_lock.sleep(1);
      --这里有个特殊处理: 因为level1没有设置条件,所以level1可以收取pn消息. 且level2也可以收取pn
      --但是我只想level3收到此消息,所以这消息入队的时候指定了recipient
      -- top pn event only visible to level3, by using speicified recipients
      v_aq_msg := aq_admin.aq_msg_type (13,'-','-','p3','content priority 3',sysdate);
    
      l_rcpt_list(0) := sys.aq$_agent(name => 'level3', address  => null, protocol => 0); 
      v_message_properties.recipient_list := l_rcpt_list;
    
      v_enqueue_options.visibility :=dbms_aq.immediate;
      dbms_aq.enqueue(
          queue_name => v_queue_name
          ,enqueue_options => v_enqueue_options
          ,message_properties => v_message_properties
          ,payload => v_aq_msg
          ,msgid => v_message_handle);
      dbms_output.put_line('encode success,msgid is ' || v_message_handle);
      commit;
    end; 
    

    5.收取一个指定Consumer的所有消息. 这是一个主动获取的过程(Pull), 订阅者并没有得到通知 (aq_user login)

    declare
     v_queue_name varchar2(50) :='aq_admin.aq_msg_queue4';
     v_aq_msg             aq_admin.aq_msg_type;
     v_message_handle RAW(16);
     v_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
     v_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
     l_consumer varchar2(30) := 'level3';
     l_more     boolean := true;
     no_messages exception;
     pragma exception_init(no_messages, -25228);
    begin
     v_dequeue_options.consumer_name := l_consumer ;
     v_dequeue_options.navigation    := dbms_aq.first_message;
     v_dequeue_options.wait          := dbms_aq.no_wait;
    
     while (l_more) loop
     begin
       dbms_aq.dequeue(queue_name => v_queue_name,dequeue_options => v_dequeue_options,message_properties => v_message_properties,payload => v_aq_msg,msgid => v_message_handle);
       commit;
       dbms_output.put_line('--------------------------------------- ');
       dbms_output.put_line('msg_seq :'         || v_aq_msg.msg_seq);
       dbms_output.put_line('msg_sender :'      || v_aq_msg.msg_sender);
       dbms_output.put_line('msg_receiver :'    || v_aq_msg.msg_receiver);
       dbms_output.put_line('msg_level :'       || v_aq_msg.msg_level);
       v_dequeue_options.navigation := dbms_aq.next_message;
      exception when no_messages then
            l_more := false;
      end;
      end loop;
    end; 
    ----------------输出如下-------------------
    msg_seq :13
    msg_sender :-
    msg_receiver :-
    msg_level :p3
    
    select count(1)  from aq_msg_qtab4
    -------------------------------------------------
    2
    

    6.清空消息队列(aq_admin login)

    DECLARE
      v_options sys.dbms_aqadm.aq$_purge_options_t;
      l_qt varchar2(30):= 'aq_msg_qtab4';
      l_q varchar2(30):='aq_msg_queue4';
    BEGIN
      SYS.DBMS_AQADM.STOP_QUEUE(QUEUE_NAME => l_q);
      dbms_aqadm.purge_queue_table(l_qt, NULL, v_options);
      SYS.DBMS_AQADM.START_QUEUE(QUEUE_NAME => l_q);
    END;
    

    7.修改回调过程(aq_user login)

        增加如下 3 个 过程
        AQ_USER.CALL_BACK_PCK.AQ_MSG_QUEUE41
        AQ_USER.CALL_BACK_PCK.AQ_MSG_QUEUE42
        AQ_USER.CALL_BACK_PCK.AQ_MSG_QUEUE43
    

    8.注册回调 aq_admin login

    begin                                                          
      -- Register the procedure for dequeuing the messages received.
      dbms_aq.register(
        sys.aq$_reg_info_list(
          sys.aq$_reg_info('AQ_ADMIN.AQ_MSG_QUEUE4:LEVEL1',dbms_aq.namespace_aq, 
                           'plsql://AQ_USER.CALL_BACK_PCK.AQ_MSG_QUEUE41',hextoraw('FF'))
                          ),
          1);
    
     dbms_aq.register(
        sys.aq$_reg_info_list(
          sys.aq$_reg_info('AQ_ADMIN.AQ_MSG_QUEUE4:LEVEL2',dbms_aq.namespace_aq, 
                           'plsql://AQ_USER.CALL_BACK_PCK.AQ_MSG_QUEUE42',hextoraw('FF'))
                          ),
          1);
    
     dbms_aq.register(
        sys.aq$_reg_info_list(
          sys.aq$_reg_info('AQ_ADMIN.AQ_MSG_QUEUE4:LEVEL3',dbms_aq.namespace_aq, 
                           'plsql://AQ_USER.CALL_BACK_PCK.AQ_MSG_QUEUE43',hextoraw('FF'))
                          ),
          1);
    end;
    

    9.入队测试(aq_user)

    重复上面的步骤4
    

    10.验证数据

    select t.aq_msg_seq,t.queue_name,t.consumer_name,t.content.msg_content  
    from aq_msg_received t order by 3,1
    ---------------------------------------------------------------------------------------------------
    AQ_MSG_SEQ  QUEUE_NAME                  CONSUMER_NAME           CONTENT.MSG_CONTENT
    8           "AQ_ADMIN"."AQ_MSG_QUEUE4"  aq_msg_queue41->LEVEL1  content priority 1
    10          "AQ_ADMIN"."AQ_MSG_QUEUE4"  aq_msg_queue41->LEVEL1  content priority 2
    9           "AQ_ADMIN"."AQ_MSG_QUEUE4"  aq_msg_queue42->LEVEL2  content priority 2
    11          "AQ_ADMIN"."AQ_MSG_QUEUE4"  aq_msg_queue43->LEVEL3  content priority 3
    
    

    相关文章

      网友评论

          本文标题:Oracle Advanced Queuing 之05(多订阅人

          本文链接:https://www.haomeiwen.com/subject/shvibktx.html