Created
February 20, 2019 15:26
-
-
Save dastanaron/d541c2dbb0c272e5fcb3cb7d5d94f746 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
namespace Adapters; | |
use Vendor\PhpAmqpLib\Connection\AMQPStreamConnection; | |
use Vendor\PhpAmqpLib\Message\AMQPMessage; | |
class RabbitAdapter | |
{ | |
/** | |
* @var \Vendor\PhpAmqpLib\Channel\AMQPChannel | |
*/ | |
protected $channel; | |
/** | |
* @var AMQPStreamConnection | |
*/ | |
protected $connection; | |
/** | |
* RabbitAdapter constructor. | |
* @throws \Fabrikant\Exception | |
*/ | |
public function __construct() | |
{ | |
$this->connection = $this->getConnection(); | |
$this->channel = $this->connection->channel(); | |
} | |
/** | |
* @return RabbitAdapter | |
* @throws \Fabrikant\Exception | |
*/ | |
public static function get() | |
{ | |
return new self(); | |
} | |
/** | |
* @return \Vendor\PhpAmqpLib\Channel\AMQPChannel | |
*/ | |
public function getChannel() | |
{ | |
return $this->channel; | |
} | |
/** | |
* @param $queue | |
* @param bool $passive | |
* @param bool $durable | |
* @param bool $exclusive | |
* @param bool $autoDelete | |
* @param bool $nowait | |
* @param array $arguments | |
* @param null $ticket | |
*/ | |
public function declareQueue | |
( | |
$queue, | |
$passive = false, | |
$durable = true, | |
$exclusive = false, | |
$autoDelete = false, | |
$nowait = false, | |
$arguments = array(), | |
$ticket = null | |
) | |
{ | |
$this->channel->queue_declare($queue, $passive, $durable, $exclusive, $autoDelete, $nowait, $arguments, $ticket); | |
} | |
/** | |
* @param $exchange | |
* @param string $type | |
* @param bool $passive | |
* @param bool $durable | |
* @param bool $autoDelete | |
* @param bool $internal | |
* @param bool $nowait | |
* @param array $arguments | |
* @param null $ticket | |
*/ | |
public function declareExchange( | |
$exchange, | |
$type = 'direct', | |
$passive = false, | |
$durable = true, | |
$autoDelete = false, | |
$internal = false, | |
$nowait = false, | |
$arguments = array(), | |
$ticket = null | |
) | |
{ | |
$this->channel->exchange_declare( | |
$exchange, | |
$type, | |
$passive, | |
$durable, | |
$autoDelete, | |
$internal, | |
$nowait, | |
$arguments, | |
$ticket | |
); | |
} | |
/** | |
* @param Message $message | |
* @param string $exchange | |
* @param string $routing_key | |
* @param bool $mandatory | |
* @param bool $immediate | |
* @param null $ticket | |
*/ | |
public function pushMessage(AMQPMessage $message, $exchange = '', $routing_key = '', $mandatory = false, $immediate = false, $ticket = null) | |
{ | |
$this->channel->basic_publish($message, $exchange, $routing_key, $mandatory, $immediate, $ticket); | |
} | |
/** | |
* @param $queue | |
* @param $exchange | |
* @param null $routingKey | |
*/ | |
public function queueBind($queue, $exchange, $routingKey = null) | |
{ | |
!is_null($routingKey) || $routingKey = $queue; | |
$this->channel->queue_bind( | |
$queue, | |
$exchange, | |
$routingKey | |
); | |
} | |
/** | |
* Удаляет очередь | |
* @param string $queue | |
* @param bool $if_unused | |
* @param bool $if_empty | |
* @param bool $nowait | |
* @param null $ticket | |
* | |
* @return mixed|null | |
*/ | |
public function deleteQueue($queue = '', $if_unused = false, $if_empty = false, $nowait = false, $ticket = null) | |
{ | |
return $this->channel->queue_delete($queue, $if_unused, $if_empty, $nowait, $ticket); | |
} | |
/** | |
* Удаляет обменник | |
* @param $exchange | |
* @param bool $if_unused | |
* @param bool $nowait | |
* @param null $ticket | |
* | |
* @return mixed|null | |
*/ | |
public function deleteExchange($exchange, $if_unused = false, $nowait = false, $ticket = null) | |
{ | |
return $this->channel->exchange_delete($exchange, $if_unused, $nowait, $ticket); | |
} | |
/** | |
* @return AMQPStreamConnection | |
* @throws \Fabrikant\Exception | |
*/ | |
protected function getConnection() | |
{ | |
$config = [];//your config | |
$connection = new AMQPStreamConnection( | |
$config['host'], | |
$config['port'], | |
$config['login'], | |
$config['password'], | |
$config['vhost'] | |
); | |
return $connection; | |
} | |
/** | |
* Destructor | |
* close connections | |
*/ | |
public function __destruct() | |
{ | |
$this->channel->close(); | |
$this->connection->close(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment