PHP 框架 Hyperf 实现处理超时未支付订单和延时队列-PHP教程-爱上资源吧
最新公告
  • 欢迎您光临爱上资源吧,本站秉承服务宗旨 履行“站长”责任,销售只是起点 服务永无止境!立即加入我们
  • PHP 框架 Hyperf 实现处理超时未支付订单和延时队列-PHP教程

    延时队列

    • Delayproducer.Php

    • Amqpbuilder.Php

    AmqpBuilder.php

    arguments = array_merge($this->arguments, $arguments);
    
            return $this;
    
        }
    
        /**
    
         * 设置延时队列相关参数
    
         *
    
         * @param string $queueName
    
         * @param int    $xMessageTtl
    
         * @param string $xDeadLetterExchange
    
         * @param string $xDeadLetterRoutingKey
    
         *
    
         * @return $this
    
         */
    
        public function setDelayedQueue(string $queueName, int $xMessageTtl, string $xDeadLetterExchange, string $xDeadLetterRoutingKey) : self
    
        {
            $this->setArguments([
                'x-message-ttl'             => ['I', $xMessageTtl * 1000], // 毫秒
                'x-dead-letter-exchange'    => ['S', $xDeadLetterExchange],
                'x-dead-letter-routing-key' => ['S', $xDeadLetterRoutingKey],
            ]);
            $this->setQueue($queueName);
            return $this;
        }
    }

    DelayProducer.php

    produceMessage($producerMessage, $queueBuilder, $confirm, $timeout);
    
            });
    
        }
    
        /**
    
         * @param ProducerMessageInterface $producerMessage
    
         * @param AmqpBuilder              $queueBuilder
    
         * @param bool                     $confirm
    
         * @param int                      $timeout
    
         *
    
         * @return bool
    
         * @throws \Throwable
    
         */
    
        private function produceMessage(ProducerMessageInterface $producerMessage, AmqpBuilder $queueBuilder, bool $confirm = false, int $timeout = 5) : bool
    
        {
    
            $result = false;
    
            $this->injectMessageProperty($producerMessage);
    
            $message = new AMQPMessage($producerMessage->payload(), $producerMessage->getProperties());
    
            $pool    = $this->getConnectionPool($producerMessage->getPoolName());
    
            /** @var \Hyperf\Amqp\Connection $connection */
    
            $connection = $pool->get();
    
            if ($confirm) {
    
                $channel = $connection->getConfirmChannel();
    
            } else {
    
                $channel = $connection->getChannel();
    
            }
    
            $channel->set_ack_handler(function () use (&$result)
    
            {
    
                $result = true;
    
            });
    
            try {
    
                // 处理延时队列
    
                $exchangeBuilder = $producerMessage->getExchangeBuilder();
    
                // 队列定义
    
                $channel->queue_declare($queueBuilder->getQueue(), $queueBuilder->isPassive(), $queueBuilder->isDurable(), $queueBuilder->isExclusive(), $queueBuilder->isAutoDelete(), $queueBuilder->isNowait(), $queueBuilder->getArguments(), $queueBuilder->getTicket());
    
                // 路由定义
    
                $channel->exchange_declare($exchangeBuilder->getExchange(), $exchangeBuilder->getType(), $exchangeBuilder->isPassive(), $exchangeBuilder->isDurable(), $exchangeBuilder->isAutoDelete(), $exchangeBuilder->isInternal(), $exchangeBuilder->isNowait(), $exchangeBuilder->getArguments(), $exchangeBuilder->getTicket());
    
                // 队列绑定
    
                $channel->queue_bind($queueBuilder->getQueue(), $producerMessage->getExchange(), $producerMessage->getRoutingKey());
    
                // 消息发送
    
                $channel->basic_publish($message, $producerMessage->getExchange(), $producerMessage->getRoutingKey());
    
                $channel->wait_for_pending_acks_returns($timeout);
    
            } catch (Throwable $exception) {
    
                // Reconnect the connection before release.
    
                $connection->reconnect();
    
                throw $exception;
    
            }
    
            finally {
    
                $connection->release();
    
            }
    
            return $confirm ? $result : true;
    
        }
    
        /**
    
         * @param ProducerMessageInterface $producerMessage
    
         */
    
        private function injectMessageProperty(ProducerMessageInterface $producerMessage) : void
    
        {
    
            if (class_exists(AnnotationCollector::class)) {
    
                /** @var \Hyperf\Amqp\Annotation\Producer $annotation */
    
                $annotation = AnnotationCollector::getClassAnnotation(get_class($producerMessage), Producer::class);
    
                if ($annotation) {
    
                    $annotation->routingKey && $producerMessage->setRoutingKey($annotation->routingKey);
    
                    $annotation->exchange && $producerMessage->setExchange($annotation->exchange);
    
                }
    
            }
    
        }
    
    }

    处理超时订单

    • Orderqueueconsumer.Php

    • Orderqueueproducer.Php

    Orderqueueproducer.php

    payload = $data;
    
        }
    
        public function getExchangeBuilder() : ExchangeBuilder
    
        {
    
            return parent::getExchangeBuilder(); // TODO: Change the autogenerated stub
    
        }
    
    }

    Orderqueueconsumer.php

    Demo

    $builder = new AmqpBuilder();
    
            $builder->setDelayedQueue('order_exchange', 1, 'delay_exchange', 'delay_route');
    
            $que = ApplicationContext::getContainer()->get(DelayProducer::class);
    
            var_dump($que->produce(new OrderQueueProducer(['order_sn' => (string)mt_rand(10000, 90000)]), $builder))


    以上就是PHP 框架 Hyperf 实现处理超时未支付订单和延时队列的详细内容,更多请关注php中文网其它相关文章!



    1. 本站所有资源来源于用户上传和网络,因此不包含技术服务请大家谅解!如有侵权请邮件联系客服!kuq@kuqshw.com
    2. 本站不保证所提供下载的资源的准确性、安全性和完整性,资源仅供下载学习之用!如有链接无法下载、失效或广告,请联系客服处理,有奖励!
    3. 您必须在下载后的24个小时之内,从您的电脑中彻底删除上述内容资源!如用于商业或者非法用途,与本站无关,一切后果请用户自负!
    4. 如果您也有好的资源或教程,您可以投稿发布,成功分享后有站币奖励和额外收入!

    爱上资源吧 » PHP 框架 Hyperf 实现处理超时未支付订单和延时队列

    发表评论

    © 2015-2020 爱上资源吧 - 由 互联无限 赞助 武汉类森科技有限公司 & LaySNS Theme. All rights reserved kuq@kuqshw.com

    本站所有资源均采集网络 如有侵权、不妥,请第一时间联系我们【kuq@kuqshw.com】删除。敬请谅解!

    XML地图 | 站长导航