美文网首页
Oracle Advanced Queuing 之04(优先级)

Oracle Advanced Queuing 之04(优先级)

作者: 轻飘飘D | 来源:发表于2020-11-13 23:02 被阅读0次

    1.創建隊列及隊列表且启动之 aq_admin login

    begin
    --Create a table for queues 
    dbms_aqadm.create_queue_table(queue_table=>'aq_msg_qtab3',queue_payload_type=>'AQ_ADMIN.AQ_MSG_TYPE'
    ,sort_list => 'priority,enq_time' --按优先级和入列时间排序
    ,multiple_consumers => true  --多消费者
    ,comment => 'queue for aq_msg_queue3'
    );
    
      --Create a test queue
      dbms_aqadm.create_queue(
       queue_name  => 'aq_msg_queue3'
      ,queue_table => 'aq_msg_qtab3'
      ,queue_type => sys.dbms_aqadm.normal_queue
      ,max_retries => 3 --dequeue失败后重试次数
      ,retry_delay => 1 --重试前等待
      ,retention_time => 0 --dequeue后保持时间,不保持
    );
                               
      --Start the queue                           
      dbms_aqadm.start_queue (queue_name => 'aq_msg_queue3');      
    end;
    /
    
    begin
     dbms_aqadm.grant_queue_privilege (privilege => 'ALL',queue_name => 'AQ_ADMIN.AQ_MSG_QUEUE3',grantee => 'AQ_USER',grant_option => FALSE);
    end;
    

    2.创建消息订阅者 aq_admin login

    begin  
      -- Configure the demo subscriber.
      dbms_aqadm.add_subscriber (queue_name => 'aq_msg_queue3',subscriber => sys.aq$_agent(name => 'subscriber_xag3',address  => null,protocol => 0));
    
    end;
    /
    

    3.入队 aq_user login

    declare
      v_queue_name varchar2(50);
      v_enqueue_options     dbms_aq.enqueue_options_t;
      v_message_properties  dbms_aq.message_properties_t;
      v_message_handle      raw(16);
      v_aq_msg              aq_admin.aq_msg_type;
    begin
      v_queue_name:='aq_admin.aq_msg_queue3';
      v_aq_msg := aq_admin.aq_msg_type (11,'-','-','p1','content priority 1',sysdate);
      v_message_properties.priority := 1; --该消息的优先级别
      v_enqueue_options.visibility :=dbms_aq.immediate;
      dbms_aq.enqueue(
          queue_name => v_queue_name
          ,enqueue_options => v_enqueue_options
          ,message_properties => v_message_properties
          ,payload => v_aq_msg
          ,msgid => v_message_handle);
      dbms_output.put_line('encode success,msgid is ' || v_message_handle);
      commit;
    end;
    
    --改变 priority 的值为3,2,再入列两次,现在入列的顺序为1 3 2,
    --我们希望的出列顺序为 1,2,3
     入队
     v_aq_msg := aq_admin.aq_msg_type (13,'-','-','p3','content priority 3',sysdate);
     v_message_properties.priority := 3; --该消息的优先级别
    再入队
     v_aq_msg := aq_admin.aq_msg_type (12,'-','-','p2','content priority 2',sysdate);
     v_message_properties.priority := 2; --该消息的优先级别
    
    select count(1) from aq_admin.aq_msg_qtab3;
    -------------------------------------------------------
    3
    

    4.出队列(aq_user)

    declare
     v_queue_name varchar2(50);
     v_aq_msg             aq_admin.aq_msg_type;
     v_message_handle RAW(16);
     v_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
     v_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
    begin
     v_dequeue_options.visibility :=DBMS_AQ.IMMEDIATE;
     v_dequeue_options.consumer_name := 'subscriber_xag3';
     v_queue_name:='aq_admin.aq_msg_queue3';
    
     dbms_aq.dequeue(queue_name => v_queue_name,dequeue_options => v_dequeue_options,message_properties => v_message_properties,payload => v_aq_msg,msgid => v_message_handle);
     dbms_output.put_line('--------------------------------------- ');
     dbms_output.put_line('msg_seq :'         || v_aq_msg.msg_seq);
     dbms_output.put_line('msg_sender :'      || v_aq_msg.msg_sender);
     dbms_output.put_line('msg_receiver :'    || v_aq_msg.msg_receiver);
     dbms_output.put_line('msg_level :'       || v_aq_msg.msg_level);
     dbms_output.put_line('msg_content :'     || v_aq_msg.msg_content);
     dbms_output.put_line('msg_create_time :' || to_char(v_aq_msg.msg_create_time,'yyyy-mm-dd hh24:mi:ss'));
     dbms_output.put_line('current_time :'    || to_char(v_aq_msg.msg_create_time,'yyyy-mm-dd hh24:mi:ss'));
     commit;
    end;
    ---------------------------------------------------------------------------------
    以上出队脚本运行3次,依次得到如下结果:
    msg_seq :11
    msg_sender :-
    msg_receiver :-
    msg_level :p1
    msg_content :content priority 1
    msg_create_time :2020-11-13 22:37:08
    current_time :2020-11-13 22:37:08
    --------------------------------------- 
    msg_seq :12
    msg_sender :-
    msg_receiver :-
    msg_level :p2
    msg_content :content priority 2
    msg_create_time :2020-11-13 22:37:34
    current_time :2020-11-13 22:37:34
    --------------------------------------- 
    msg_seq :13
    msg_sender :-
    msg_receiver :-
    msg_level :p3
    msg_content :content priority 3
    msg_create_time :2020-11-13 22:37:20
    current_time :2020-11-13 22:37:20
    

    5.注册回调 aq_admin login

    begin                                                          
      -- Register the procedure for dequeuing the messages received.
      dbms_aq.register(
        sys.aq$_reg_info_list(
          sys.aq$_reg_info('AQ_ADMIN.AQ_MSG_QUEUE3:SUBSCRIBER_XAG3',dbms_aq.namespace_aq, 
                           'plsql://AQ_USER.CALL_BACK_PCK.AQ_MSG_QUEUE1',hextoraw('FF'))
                          ),
          1);
    end;
    

    6.入队测试(aq_user)

    
    declare
      v_queue_name varchar2(50);
      v_enqueue_options     dbms_aq.enqueue_options_t;
      v_message_properties  dbms_aq.message_properties_t;
      v_message_handle      raw(16);
      v_aq_msg              aq_admin.aq_msg_type;
    begin
      v_queue_name:='aq_admin.aq_msg_queue3';
      v_aq_msg := aq_admin.aq_msg_type (11,'-','-','p1','content priority 1',sysdate);
      v_message_properties.priority := 1; --该消息的优先级别
      v_enqueue_options.visibility :=dbms_aq.immediate;
      dbms_aq.enqueue(
          queue_name => v_queue_name
          ,enqueue_options => v_enqueue_options
          ,message_properties => v_message_properties
          ,payload => v_aq_msg
          ,msgid => v_message_handle);
      dbms_output.put_line('encode success,msgid is ' || v_message_handle);
      commit;
      
      dbms_lock.sleep(1);
      v_aq_msg := aq_admin.aq_msg_type (13,'-','-','p3','content priority 3',sysdate);
      v_message_properties.priority := 3; --该消息的优先级别
      v_enqueue_options.visibility :=dbms_aq.immediate;
      dbms_aq.enqueue(
          queue_name => v_queue_name
          ,enqueue_options => v_enqueue_options
          ,message_properties => v_message_properties
          ,payload => v_aq_msg
          ,msgid => v_message_handle);
      dbms_output.put_line('encode success,msgid is ' || v_message_handle);
      commit;
      
      dbms_lock.sleep(1);
      v_aq_msg := aq_admin.aq_msg_type (12,'-','-','p2','content priority 2',sysdate);
      v_message_properties.priority := 2; --该消息的优先级别
      v_enqueue_options.visibility :=dbms_aq.immediate;
      dbms_aq.enqueue(
          queue_name => v_queue_name
          ,enqueue_options => v_enqueue_options
          ,message_properties => v_message_properties
          ,payload => v_aq_msg
          ,msgid => v_message_handle);
      dbms_output.put_line('encode success,msgid is ' || v_message_handle);
      commit;
    end;
    

    7.验证数据 (由下面数据可得知,因会及时回调,所以先入队列的先出对,优先级设置无效)

    select t.aq_msg_seq,t.queue_name,t.received_time,t.consumer_name,t.content.msg_content,t.content.msg_create_time  
    from aq_msg_received t
    ---------------------------------------------------------------------------------------------------
    AQ_MSG_SEQ  QUEUE_NAME                  RECEIVED_TIME       CONSUMER_NAME   CONTENT.MSG_CONTENT CONTENT.MSG_CREATE_TIME
    5           "AQ_ADMIN"."AQ_MSG_QUEUE3"  2020/11/13 22:54:30 SUBSCRIBER_XAG3 content priority 1  2020/11/13 22:54:30
    6           "AQ_ADMIN"."AQ_MSG_QUEUE3"  2020/11/13 22:54:31 SUBSCRIBER_XAG3 content priority 3  2020/11/13 22:54:31
    7           "AQ_ADMIN"."AQ_MSG_QUEUE3"  2020/11/13 22:54:32 SUBSCRIBER_XAG3 content priority 2  2020/11/13 22:54:32
    
    

    相关文章

      网友评论

          本文标题:Oracle Advanced Queuing 之04(优先级)

          本文链接:https://www.haomeiwen.com/subject/xuimbktx.html