美文网首页
Rabbit MQ Python Version

Rabbit MQ Python Version

作者: 阿猫阿狗py | 来源:发表于2020-09-23 17:12 被阅读0次

    product

    # encoding=utf-8
    import pika
    import json
    
    system_exchange = "exchange"
    
    # Create a new instance of PlainCredentials
    credentials = pika.PlainCredentials('', '')
    
    # Create a new ConnectionParameters instance
    parameters = pika.ConnectionParameters('', , '', credentials)
    
    
    class MessageQueue(object):
        """
        define queue
        """
    
        def __init__(self, channel, queue_name):
            self.channel = channel
            self.queue_name = queue_name
            self.durable = False
    
        def get_message(self):
            # Get a single message from the AMQP broker. Returns a sequence with
            # the method frame, message properties, and body.
            return self.channel.basic_get(self.queue_name)
    
        def message_count(self):
            return self.channel.queue_declare(self.queue_name, durable=self.durable).method.message_count
    
        def set_consumer(self, callback, auto_ack=True):
            # Sends the AMQP command Basic.Consume to the broker and binds messages
            # for the consumer_tag to the consumer callback. If you do not pass in
            # a consumer_tag, one will be automatically generated for you. Returns
            # the consumer tag.
            self.channel.basic_consume(on_message_callback=callback, queue=self.queue_name, auto_ack=auto_ack)
    
    
    class MessageChannel(object):
    
        def __init__(self):
            # Create a new instance of the Connection object
            self.connection = pika.BlockingConnection(parameters)
            # Create a new channel with the next available channel number or pass
            # in a channel number to use. Must be non-zero if you would like to
            # specify but it is recommended that you let Pika manage the channel
            # numbers
            self.channel = self.connection.channel()
            # This method creates an exchange if it does not already exist, and if
            # the exchange exists, verifies that it is of the correct and expected
            # class.
            self.channel.exchange_declare(exchange=system_exchange, exchange_type='topic')
            # Specify quality of service. This method requests a specific quality
            # of service. The QoS can be specified for the current channel or for all
            # channels on the connection. The client can request that messages be sent
            # in advance so that when the client finishes processing a message, the
            # following message is already held locally, rather than needing to be
            # sent down the channel. Prefetching gives a performance improvement
            self.channel.basic_qos(prefetch_count=1)
    
        def define_queue(self, queue_name, routing_key=None, exclusive=False):
            """
            define queue and routing_key
            :param queue_name:
            :param routing_key:
            :param exclusive:
            :return:
            """
            # Declare queue, create if needed. This method creates or checks a
            # queue. When creating a new queue the client can specify various
            # properties that control the durability of the queue and its contents,
            # and the level of sharing for the queue
            self.channel.queue_declare(queue=queue_name, durable=False, exclusive=exclusive)
            if routing_key:
                # Bind the queue to the specified exchange
                self.channel.queue_bind(queue=queue_name, exchange=system_exchange, routing_key=routing_key)
            return MessageQueue(self.channel, queue_name)
    
        def publish(self, routing_key, msg):
            # Publish to the channel with the given exchange, routing key, and body.
            self.channel.basic_publish(body=msg, exchange=system_exchange, routing_key=routing_key)
            print(" [x] Sent %s" % msg)
    
        def start_consuming(self):
            # Processes I/O events and dispatches timers and `basic_consume`
            # callbacks until all consumers are cancelled.
            self.channel.start_consuming()
    
        def close(self):
            self.channel.close()
            self.connection.close()
    
    
    if __name__ == "__main__":
        channel = MessageChannel()
        send = 0
        while send <= 30:
            mq = channel.define_queue("kaola")
            message_count = mq.message_count()
            if message_count < 10:
                channel.publish("", json.dumps({'url': ''}))
                send += 1
        channel.close()
    
    
    
    

    customer&product

    # encoding=utf-8
    import pika
    import asyncio
    import time
    
    from pyppeteer import launch
    
    # credentials = pika.PlainCredentials('', '')
    # parameters = pika.ConnectionParameters('', 5672, '', credentials)
    # connection = pika.BlockingConnection(parameters)
    # channel = connection.channel()
    
    # channel.queue_declare(queue='hello')
    
    from my_daniel import MessageChannel, MessageQueue
    
    print(' [*] Waiting for messages. To exit press CTRL+C')
    
    
    def open_url(url):
        async def get_data(url):
            browser = await launch(headless=False, userDataDir='./userdata', args=['--disable-infobars'])
            await browser.pages()
            page = await browser.newPage()
            await page.goto(url)
            await page.setViewport({
                'width': 1350,
                'height': 850
            })
            frame = page
            await frame.evaluate(
                '''() =>{ Object.defineProperties(navigator,{ webdriver:{ get: () => false } }) }''')
            time.sleep(6)
            page_source = await page.content()
            await page.close()
            await browser.close()
            return page_source
    
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        return loop.run_until_complete(get_data(url))
    
    
    def callback(ch, method, properties, body):
        import json
        body = json.loads(body)
        print(body['url'])
        print(" [x] Received %r" % (body,))
        page_source = open_url(body["url"])
        channel.publish("harvested", json.dumps({"page_source": page_source}))
        # channel.basic_publish(exchange="exchange",routing_key="harvested", body=json.dumps({"harvested": {"title":"nihao","price":1259,"page_source":str(page_source)}}))
    
    
    if __name__ == '__main__':
        channel = MessageChannel()
        mq = channel.define_queue("kaola")
        mq.set_consumer(callback)
        channel.start_consuming()
        channel.close()
    
    

    customer

    # encoding:utf-8
    from my_daniel import MessageChannel
    receive_channel = MessageChannel()
    # receive_channel.define_queue('harvested')
    mq = receive_channel.define_queue("harvested")
    
    
    def callback(ch, method, properties, body):
        print(body)
    
    
    # mq.set_consumer(callback)
    print(mq.message_count())
    receive_channel.start_consuming()
    receive_channel.close()
    

    相关文章

      网友评论

          本文标题:Rabbit MQ Python Version

          本文链接:https://www.haomeiwen.com/subject/regnyktx.html