Publishing a message
$exchange = 'cache_exchange'; // cache(死信)转给delay
$queue = 'cache_queue';
$routing = 'cache_exchange';
$exchangeType = 'fanout';
$message = ['keyword' => '123 hello word, dlxm, time = ' . date('h:i:sa')];
$message['_uuid'] = $this->genMessageCode($exchange, $routing, $message);
$tale = new AMQPTable();
$tale->set('x-dead-letter-exchange', 'delay_exchange');//****很关键 表示过期后由哪个exchange处理
//$tale->set('x-dead-letter-routing-key', 'delay_exchange');//****很关键 表示过期后由哪个exchange处理
// $tale->set('x-message-ttl', 60);
$properties = [
'exchange' => $exchange,
'exchange_type' => $exchangeType,
'exchange_durable' => true,
'routing' => $routing,
'queue' => $queue,
'queue_passive' => false,
'queue_durable' => true,
'queue_exclusive' => false,
'queue_auto_delete' => false,
'queue_properties' => $tale,
];
(new Amqp)->publish($routing, json_encode($message), $properties);
Consuming messages
(new Amqp())->consume('delay_exchange', function ($message, $resolver) {
$route = $message->delivery_info['routing_key'];
$messageBody = $message->body;
$resolver->acknowledge($message);
echo date('h:i:sa') . '@' . $route . ':' . $messageBody . "consume_success\n";
}, [
'exchange' => 'delay_exchange',
'exchange_type' => 'fanout',
]);
网友评论