Subscribe
池塘🐟
Search
Sign up
php
laravel
mysql
docker
redis
rabbitmq
go
js
其他
php--rabbitmq使用,死信队列,延迟队列
#####创建exchange,queue,接收消息,确认消息 ``` $connection = new AMQPConnection(); $connection->setHost('127.0.0.1'); $connection->setLogin('guest'); $connection->setPassword('guest'); $connection->connect(); //Create and declare channel $channel = new AMQPChannel($connection); $exchange = new AMQPExchange($channel); $exchange->setName('dead_que1'); $exchange->setFlags(AMQP_DURABLE); $exchange->setType(AMQP_EX_TYPE_DIRECT); $exchange->declareExchange(); // 如果不存在就创建exchange,如果存在,但是配置不同,抛出异常 // $queue_name = 'hello'; $queue = new AMQPQueue($channel); $queue->setName($queue_name); $queue->setFlags(AMQP_NOPARAM); $queue->declareQueue();// 如果不存在就创建queue,如果存在,但是配置不同,抛出异常 $queue->bind('dead_que1', 'hello_d');//绑定交换机(如死信交换机, hello_d为 与交换机绑定的route key)先创建队列,才可以绑定 $callback_func = function(AMQPEnvelope $message, AMQPQueue $q) use (&$max_consume) { echo PHP_EOL, "------------", PHP_EOL; echo " [x] Received ", $message->getBody(), PHP_EOL; echo PHP_EOL, "------------", PHP_EOL; $q->nack($message->getDeliveryTag());//自动确认,不可靠,但是常用,性能好,效率高 //$q->ack($message->getDeliveryTag());//手动确认 sleep(0.001); }; echo ' [*] Waiting for messages. To exit press CTRL+C ', PHP_EOL; $queue->consume($callback_func);//阻塞确认消息 echo 'Close connection...', PHP_EOL; $queue->cancel(); $connection->disconnect(); ``` #####发送消息 ``` $connection = new AMQPConnection(); $connection->setHost('127.0.0.1'); $connection->setLogin('guest'); $connection->setPassword('guest'); $connection->connect(); //Create and declare channel $channel = new AMQPChannel($connection); $exchange = new AMQPExchange($channel); $routing_key = 'hello_1'; try{ $queue_name = 'hello_1'; $queue = new AMQPQueue($channel); $queue->setName($queue_name); $queue->setFlags(AMQP_DURABLE);//队列持久,重启rabbit不会消失 $queue->setArgument('x-message-ttl', 5000);//消息过期时间 ,单位毫秒 ,5s $queue->setArgument('x-dead-letter-exchange', 'dead_que'); // 死信交换机 $queue->setArgument('x-dead-letter-routing-key', 'hello_d'); // 死信队列绑定死信交换机的key $queue->declareQueue();//创建hello_1队列并关联死信队列 $message = '{"order_id":1}'; $exchange->publish($message, $routing_key, AMQP_NOPARAM, ['delivery_mode' => 2]); //delivery_mode=2 消息持久--重启rabbit不会消失 $connection->disconnect(); }catch(Exception $ex){ print_r($ex); } ``` `` 使用示例 `` ``` <?php namespace Common\Util\Mq; class RabbitMq { protected $amqpConnect; protected $channel; protected $exchange; /** * RabbitMq constructor. * @param $host * @param $login * @param $password * @throws \AMQPConnectionException * @throws \AMQPExchangeException */ public function __construct($host, $login, $password) { $connection = new \AMQPConnection(); $connection->setHost($host); $connection->setLogin($login); $connection->setPassword($password); $connection->connect(); $this->amqpConnect = $connection; $this->channel = new \AMQPChannel($this->amqpConnect); $this->exchange = new \AMQPExchange($this->channel); } /** * 关闭连接 */ public function close() { $this->channel->close();; $this->amqpConnect->disconnect(); } /** * 创建交换机 默认类型 AMQP_EX_TYPE_DIRECT 默认持久化 AMQP_DURABLE * @param $exchangeName * @param $type * @param $flags * @param array $arguments * @throws \AMQPChannelException * @throws \AMQPConnectionException * @throws \AMQPExchangeException */ public function createExchange($exchangeName, $type = AMQP_EX_TYPE_DIRECT, $flags = AMQP_DURABLE, $arguments = []) { $exchange = clone $this->exchange; // 避免创建交换机与发布消息共用同一个交换机 $exchange->setName($exchangeName); $exchange->setType($type); $exchange->setFlags($flags); $exchange->setArguments($arguments); $exchange->declareExchange(); } /** * 创建队列 默认类型 AMQP_EX_TYPE_DIRECT * @param $queueName * @param string $flags * @param array $arguments * @return \AMQPQueue 返回实例是为了可以外部绑定交换机 * @throws \AMQPChannelException * @throws \AMQPConnectionException * @throws \AMQPQueueException */ public function createQueue($queueName, $flags = AMQP_EX_TYPE_DIRECT, $arguments = []) { $queue = new \AMQPQueue($this->channel); $queue->setName($queueName); $queue->setFlags($flags); $queue->setArguments($arguments); $queue->declareQueue(); return $queue; } /** * 发布消息 默认 AMQP_NOPARAM * @param $routingKey * @param $message * @param int $flags * @param array $attributes * @throws \AMQPChannelException * @throws \AMQPConnectionException * @throws \AMQPExchangeException */ public function publish($routingKey, $message, $flags = AMQP_NOPARAM, $attributes = ['delivery_mode' => 2]) { $this->exchange->publish($message, $routingKey, $flags, $attributes); } /** * @param $queueName * @param \Closure $callback */ public function consume($queueName, \Closure $callback = null) { try { $queue = new \AMQPQueue($this->channel); $queue->setName($queueName); echo ' [*] Waiting for messages. To exit press CTRL+C ', PHP_EOL; $queue->consume($callback ?: $this->callback()); } catch (\AMQPQueueException $ex) { print_r($ex); } catch (\Exception $ex) { print_r($ex); } } public function callback() { return function(\AMQPEnvelope $message, \AMQPQueue $q) { echo PHP_EOL, "------------", PHP_EOL; echo " [x] Received ", $message->getBody(), PHP_EOL; echo PHP_EOL, "------------" . $message->getDeliveryTag() , PHP_EOL; //$q->ack($message->getDeliveryTag()); //手动发送ACK应答 $q->nack($message->getDeliveryTag()); // 无需确认 }; } } //普通队列使用例子 $rabbitmq = new Common\Util\Mq\RabbitMq('127.0.0.1', 'guest', 'guest'); $rabbitmq->createExchange('test_1'); $rabbitmq->createQueue('test_1'); $result = $rabbitmq->publish('test_1', '11111');//发布消息 //$rabbitmq->close(); $result = $rabbitmq->consume('test_1');//消费消息 //死信队列使用例子 $rabbitmq = new RabbitMq('127.0.0.1', 'guest', 'guest'); //传教死信交换机 $rabbitmq->createExchange('dead_que'); //创建正常队列 ,五秒过期 $rabbitmq->createQueue('test_2', AMQP_EX_TYPE_DIRECT, [ 'x-message-ttl' => 5000, 'x-dead-letter-exchange' => 'dead_que', 'x-dead-letter-routing-key' => 'dead_key', ]); //创建死信队列-bind 死信交换机 $queue = $rabbitmq->createQueue('test_dead', AMQP_EX_TYPE_DIRECT); $queue->bind('dead_que', 'dead_key'); // 发布消息到正常队列 for ($i = 1; $i <= 10000; $i ++) { $result = $rabbitmq->publish('test_2', 'message' . $i); } //关闭连接 //$rabbitmq->close(); // 消费死信队列 $result = $rabbitmq->consume('test_dead'); ```
提交评论
提交