1) 建立消息队列基础类
<?php/** * @desc 消息队列 * @author caifangjie * @date 2016/05/03 */class queue{ //交换机名称 protected $_exchangename = 'ex_auto_home'; //队列名称 protected $_queuename = 'qu_auto_home'; //路由 protected $_routekey = 'ru_auto_home'; protected $_connecthandler; protected $_channelobject; protected $_exchangeobject; protected $_queueobject; //配置信息 protected $_config = array('host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest'); //构造函数,依次创建通道,交换机,队列 public function __construct() { try{ $this->_connecthandler = new amqpconnection($this->_config); if(!$this->_connecthandler->connect()) { die('connect failed'); } $this->createchannel(); $this->createexchange(); $this->createqueue(); } catch(exception $e) { echo $e->getmessage(); } } //创建通道 protected function createchannel() { $this->_channelobject = new amqpchannel($this->_connecthandler); } //创建交换机 public function createexchange($exchangename='', $exchangetype=amqp_ex_type_direct) { $exname = $exchangename?$exchangename:$this->_exchangename; $this->_exchangeobject = new amqpexchange($this->_channelobject); $this->_exchangeobject->setname($exname); $this->_exchangeobject->settype($exchangetype); $this->_exchangeobject->setflags(amqp_durable | amqp_autodelete); $this->_exchangeobject->declareexchange(); } //创建队列 public function createqueue() { $this->_queueobject = new amqpqueue($this->_channelobject); $this->_queueobject->setname($this->_queuename); $this->_queueobject->setflags(amqp_durable | amqp_autodelete); $this->_queueobject->declarequeue(); $this->_queueobject->bind($this->_exchangeobject->getname(), $this->_routekey); } }<?phprequire_once 'execprocess.class.php';require_once 'queue/queue.class.php';class recv extends queue{ public function __construct() { parent::__construct(); } //接受消息 public function recvmessage() { while (true) { $this->_queueobject->consume(function(amqpenvelope $e, amqpqueue $q) { $requesturl = $e->getbody(); if ($requesturl) { // var_dump($requesturl); $exechandler = new execprocess(); $exechandler->start($requesturl); $exechandler->execsave(); unset($exechandler); $q->nack($e->getdeliverytag()); } else { usleep(100); } }); } }}$reciver = new recv();$reciver->recvmessage();<?php//require_once 'execprocess.class.php';require_once 'queue/queue.class.php';class recv extends queue{ public function __construct() { parent::__construct(); } //接受消息 public function recvmessage() { while (true) { $this->_queueobject->consume(function(amqpenvelope $e, amqpqueue $q) { $requesturl = $e->getbody(); if ($requesturl) { var_dump($requesturl);// $exechandler = new execprocess();// $exechandler->start($requesturl);// $exechandler->execsave();// unset($exechandler); $q->nack($e->getdeliverytag()); } else { usleep(100); } }); } }}$reciver = new recv();$reciver->recvmessage();
我觉得我很忧伤......这是要沉贴,翻船的节奏吗?
ExecProcess是不是出问题了什么
去掉require_once 'ExecProcess.class.php';并把处理消息的逻辑去掉,是可以把队列中的消息打印出来的.....
看你描述,应该是ExecProcess.class.php中,处理消息的部分出问题了,重点检查这部分代码,看看是什么异常。
执行繁重任务才出错,
可以检查是否执行超时导致。
PHP怎么学习?PHP怎么入门?PHP在哪学?PHP怎么学才快?不用担心,这里为大家提供了PHP速学教程(入门到精通),有需要的小伙伴保存下载就能学习啦!
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号