美文网首页
php ammq拓展实例

php ammq拓展实例

作者: 不落的风筝 | 来源:发表于2019-06-21 19:13 被阅读0次

先上消费者代码consumer.php

<?php
//声明连接参数
config = array( 'host' => '127.0.0.1', 'vhost' => '/', 'port' => 5672, 'login' => 'guest', 'password' => 'guest' ); //连接brokercnn = new AMQPConnection(config); if (!cnn->connect()) {
echo "Cannot connect to the broker";
exit();
}
//在连接内创建一个通道
ch = new AMQPChannel(cnn);
//创建一个交换机
ex = new AMQPExchange(ch);
//声明路由键
routingKey = 'key_1'; //声明交换机名称exchangeName = 'exchange_1';
//设置交换机名称
ex->setName(exchangeName);
//设置交换机类型
//AMQP_EX_TYPE_DIRECT:直连交换机
//AMQP_EX_TYPE_FANOUT:扇形交换机
//AMQP_EX_TYPE_HEADERS:头交换机
//AMQP_EX_TYPE_TOPIC:主题交换机
ex->setType(AMQP_EX_TYPE_DIRECT); //设置交换机持久ex->setFlags(AMQP_DURABLE);
//声明交换机
ex->declareExchange(); //创建一个消息队列q = new AMQPQueue(ch); //设置队列名称q->setName('queue_1');
//设置队列持久
q->setFlags(AMQP_DURABLE); //声明消息队列q->declareQueue();
//交换机和队列通过routingKey进行绑定q->bind(ex->getName(),routingKey);
//接收消息并进行处理的回调方法
function receive(envelope,queue) {
//休眠两秒,
sleep(2);
//echo消息内容
echo envelope->getBody()."\n"; //显式确认,队列收到消费者显式确认后,会删除该消息queue->ack(envelope->getDeliveryTag()); } //设置消息队列消费者回调方法,并进行阻塞q->consume("receive");

生产者代码publisher.php:

<?php
config = array( 'host' => '127.0.0.1', 'vhost' => '/', 'port' => 5672, 'login' => 'guest', 'password' => 'guest' );cnn = new AMQPConnection(config); if (!cnn->connect()) {
echo "Cannot connect to the broker";
exit();
}
ch = new AMQPChannel(cnn);
ex = new AMQPExchange(ch);
//消息的路由键,一定要和消费者端一致
routingKey = 'key_1'; //交换机名称,一定要和消费者端一致,exchangeName = 'exchange_1';
ex->setName(exchangeName);
ex->setType(AMQP_EX_TYPE_DIRECT);ex->setFlags(AMQP_DURABLE);
ex->declareExchange(); //创建10个消息 for (i=1;i<=10;i++){
//消息内容
msg = array( 'data' => 'message_'.i,
'hello' => 'world',
);
//发送消息到交换机,并返回发送结果
//delivery_mode:2声明消息持久,持久的队列+持久的消息在RabbitMQ重启后才不会丢失
echo "Send Message:".ex->publish(json_encode(msg), $routingKey, AMQP_NOPARAM, array('delivery_mode' => 2))."\n";
//代码执行完毕后进程会自动退出
}

相关文章

网友评论

      本文标题:php ammq拓展实例

      本文链接:https://www.haomeiwen.com/subject/ksiuqctx.html