1
0
mirror of https://github.com/newnius/YAO-portal.git synced 2025-12-18 02:36:43 +00:00

init & add agent & add job

This commit is contained in:
2019-01-15 10:02:28 +08:00
parent 71f1f10e2c
commit d0a4b891b5
321 changed files with 24657 additions and 1 deletions

View File

@@ -0,0 +1,226 @@
<?php
/*
* This file is part of the Predis package.
*
* (c) Daniele Alessandri <suppakilla@gmail.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Predis\Connection;
use Predis\Command\CommandInterface;
use Predis\CommunicationException;
use Predis\Protocol\ProtocolException;
/**
* Base class with the common logic used by connection classes to communicate
* with Redis.
*
* @author Daniele Alessandri <suppakilla@gmail.com>
*/
abstract class AbstractConnection implements NodeConnectionInterface
{
private $resource;
private $cachedId;
protected $parameters;
protected $initCommands = array();
/**
* @param ParametersInterface $parameters Initialization parameters for the connection.
*/
public function __construct(ParametersInterface $parameters)
{
$this->parameters = $this->assertParameters($parameters);
}
/**
* Disconnects from the server and destroys the underlying resource when
* PHP's garbage collector kicks in.
*/
public function __destruct()
{
$this->disconnect();
}
/**
* Checks some of the parameters used to initialize the connection.
*
* @param ParametersInterface $parameters Initialization parameters for the connection.
*
* @throws \InvalidArgumentException
*
* @return ParametersInterface
*/
abstract protected function assertParameters(ParametersInterface $parameters);
/**
* Creates the underlying resource used to communicate with Redis.
*
* @return mixed
*/
abstract protected function createResource();
/**
* {@inheritdoc}
*/
public function isConnected()
{
return isset($this->resource);
}
/**
* {@inheritdoc}
*/
public function connect()
{
if (!$this->isConnected()) {
$this->resource = $this->createResource();
return true;
}
return false;
}
/**
* {@inheritdoc}
*/
public function disconnect()
{
unset($this->resource);
}
/**
* {@inheritdoc}
*/
public function addConnectCommand(CommandInterface $command)
{
$this->initCommands[] = $command;
}
/**
* {@inheritdoc}
*/
public function executeCommand(CommandInterface $command)
{
$this->writeRequest($command);
return $this->readResponse($command);
}
/**
* {@inheritdoc}
*/
public function readResponse(CommandInterface $command)
{
return $this->read();
}
/**
* Helper method that returns an exception message augmented with useful
* details from the connection parameters.
*
* @param string $message Error message.
*
* @return string
*/
private function createExceptionMessage($message)
{
$parameters = $this->parameters;
if ($parameters->scheme === 'unix') {
return "$message [$parameters->scheme:$parameters->path]";
}
if (filter_var($parameters->host, FILTER_VALIDATE_IP, FILTER_FLAG_IPV6)) {
return "$message [$parameters->scheme://[$parameters->host]:$parameters->port]";
}
return "$message [$parameters->scheme://$parameters->host:$parameters->port]";
}
/**
* Helper method to handle connection errors.
*
* @param string $message Error message.
* @param int $code Error code.
*/
protected function onConnectionError($message, $code = null)
{
CommunicationException::handle(
new ConnectionException($this, static::createExceptionMessage($message), $code)
);
}
/**
* Helper method to handle protocol errors.
*
* @param string $message Error message.
*/
protected function onProtocolError($message)
{
CommunicationException::handle(
new ProtocolException($this, static::createExceptionMessage($message))
);
}
/**
* {@inheritdoc}
*/
public function getResource()
{
if (isset($this->resource)) {
return $this->resource;
}
$this->connect();
return $this->resource;
}
/**
* {@inheritdoc}
*/
public function getParameters()
{
return $this->parameters;
}
/**
* Gets an identifier for the connection.
*
* @return string
*/
protected function getIdentifier()
{
if ($this->parameters->scheme === 'unix') {
return $this->parameters->path;
}
return "{$this->parameters->host}:{$this->parameters->port}";
}
/**
* {@inheritdoc}
*/
public function __toString()
{
if (!isset($this->cachedId)) {
$this->cachedId = $this->getIdentifier();
}
return $this->cachedId;
}
/**
* {@inheritdoc}
*/
public function __sleep()
{
return array('parameters', 'initCommands');
}
}

View File

@@ -0,0 +1,24 @@
<?php
/*
* This file is part of the Predis package.
*
* (c) Daniele Alessandri <suppakilla@gmail.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Predis\Connection\Aggregate;
use Predis\Connection\AggregateConnectionInterface;
/**
* Defines a cluster of Redis servers formed by aggregating multiple connection
* instances to single Redis nodes.
*
* @author Daniele Alessandri <suppakilla@gmail.com>
*/
interface ClusterInterface extends AggregateConnectionInterface
{
}

View File

@@ -0,0 +1,509 @@
<?php
/*
* This file is part of the Predis package.
*
* (c) Daniele Alessandri <suppakilla@gmail.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Predis\Connection\Aggregate;
use Predis\ClientException;
use Predis\Command\CommandInterface;
use Predis\Command\RawCommand;
use Predis\Connection\ConnectionException;
use Predis\Connection\FactoryInterface;
use Predis\Connection\NodeConnectionInterface;
use Predis\Replication\MissingMasterException;
use Predis\Replication\ReplicationStrategy;
use Predis\Response\ErrorInterface as ResponseErrorInterface;
/**
* Aggregate connection handling replication of Redis nodes configured in a
* single master / multiple slaves setup.
*
* @author Daniele Alessandri <suppakilla@gmail.com>
*/
class MasterSlaveReplication implements ReplicationInterface
{
/**
* @var ReplicationStrategy
*/
protected $strategy;
/**
* @var NodeConnectionInterface
*/
protected $master;
/**
* @var NodeConnectionInterface[]
*/
protected $slaves = array();
/**
* @var NodeConnectionInterface
*/
protected $current;
/**
* @var bool
*/
protected $autoDiscovery = false;
/**
* @var FactoryInterface
*/
protected $connectionFactory;
/**
* {@inheritdoc}
*/
public function __construct(ReplicationStrategy $strategy = null)
{
$this->strategy = $strategy ?: new ReplicationStrategy();
}
/**
* Configures the automatic discovery of the replication configuration on failure.
*
* @param bool $value Enable or disable auto discovery.
*/
public function setAutoDiscovery($value)
{
if (!$this->connectionFactory) {
throw new ClientException('Automatic discovery requires a connection factory');
}
$this->autoDiscovery = (bool) $value;
}
/**
* Sets the connection factory used to create the connections by the auto
* discovery procedure.
*
* @param FactoryInterface $connectionFactory Connection factory instance.
*/
public function setConnectionFactory(FactoryInterface $connectionFactory)
{
$this->connectionFactory = $connectionFactory;
}
/**
* Resets the connection state.
*/
protected function reset()
{
$this->current = null;
}
/**
* {@inheritdoc}
*/
public function add(NodeConnectionInterface $connection)
{
$alias = $connection->getParameters()->alias;
if ($alias === 'master') {
$this->master = $connection;
} else {
$this->slaves[$alias ?: "slave-$connection"] = $connection;
}
$this->reset();
}
/**
* {@inheritdoc}
*/
public function remove(NodeConnectionInterface $connection)
{
if ($connection->getParameters()->alias === 'master') {
$this->master = null;
$this->reset();
return true;
} else {
if (($id = array_search($connection, $this->slaves, true)) !== false) {
unset($this->slaves[$id]);
$this->reset();
return true;
}
}
return false;
}
/**
* {@inheritdoc}
*/
public function getConnection(CommandInterface $command)
{
if (!$this->current) {
if ($this->strategy->isReadOperation($command) && $slave = $this->pickSlave()) {
$this->current = $slave;
} else {
$this->current = $this->getMasterOrDie();
}
return $this->current;
}
if ($this->current === $master = $this->getMasterOrDie()) {
return $master;
}
if (!$this->strategy->isReadOperation($command) || !$this->slaves) {
$this->current = $master;
}
return $this->current;
}
/**
* {@inheritdoc}
*/
public function getConnectionById($connectionId)
{
if ($connectionId === 'master') {
return $this->master;
}
if (isset($this->slaves[$connectionId])) {
return $this->slaves[$connectionId];
}
return;
}
/**
* {@inheritdoc}
*/
public function switchTo($connection)
{
if (!$connection instanceof NodeConnectionInterface) {
$connection = $this->getConnectionById($connection);
}
if (!$connection) {
throw new \InvalidArgumentException('Invalid connection or connection not found.');
}
if ($connection !== $this->master && !in_array($connection, $this->slaves, true)) {
throw new \InvalidArgumentException('Invalid connection or connection not found.');
}
$this->current = $connection;
}
/**
* Switches to the master server.
*/
public function switchToMaster()
{
$this->switchTo('master');
}
/**
* Switches to a random slave server.
*/
public function switchToSlave()
{
$connection = $this->pickSlave();
$this->switchTo($connection);
}
/**
* {@inheritdoc}
*/
public function getCurrent()
{
return $this->current;
}
/**
* {@inheritdoc}
*/
public function getMaster()
{
return $this->master;
}
/**
* Returns the connection associated to the master server.
*
* @return NodeConnectionInterface
*/
private function getMasterOrDie()
{
if (!$connection = $this->getMaster()) {
throw new MissingMasterException('No master server available for replication');
}
return $connection;
}
/**
* {@inheritdoc}
*/
public function getSlaves()
{
return array_values($this->slaves);
}
/**
* Returns the underlying replication strategy.
*
* @return ReplicationStrategy
*/
public function getReplicationStrategy()
{
return $this->strategy;
}
/**
* Returns a random slave.
*
* @return NodeConnectionInterface
*/
protected function pickSlave()
{
if ($this->slaves) {
return $this->slaves[array_rand($this->slaves)];
}
}
/**
* {@inheritdoc}
*/
public function isConnected()
{
return $this->current ? $this->current->isConnected() : false;
}
/**
* {@inheritdoc}
*/
public function connect()
{
if (!$this->current) {
if (!$this->current = $this->pickSlave()) {
if (!$this->current = $this->getMaster()) {
throw new ClientException('No available connection for replication');
}
}
}
$this->current->connect();
}
/**
* {@inheritdoc}
*/
public function disconnect()
{
if ($this->master) {
$this->master->disconnect();
}
foreach ($this->slaves as $connection) {
$connection->disconnect();
}
}
/**
* Handles response from INFO.
*
* @param string $response
*
* @return array
*/
private function handleInfoResponse($response)
{
$info = array();
foreach (preg_split('/\r?\n/', $response) as $row) {
if (strpos($row, ':') === false) {
continue;
}
list($k, $v) = explode(':', $row, 2);
$info[$k] = $v;
}
return $info;
}
/**
* Fetches the replication configuration from one of the servers.
*/
public function discover()
{
if (!$this->connectionFactory) {
throw new ClientException('Discovery requires a connection factory');
}
RETRY_FETCH: {
try {
if ($connection = $this->getMaster()) {
$this->discoverFromMaster($connection, $this->connectionFactory);
} elseif ($connection = $this->pickSlave()) {
$this->discoverFromSlave($connection, $this->connectionFactory);
} else {
throw new ClientException('No connection available for discovery');
}
} catch (ConnectionException $exception) {
$this->remove($connection);
goto RETRY_FETCH;
}
}
}
/**
* Discovers the replication configuration by contacting the master node.
*
* @param NodeConnectionInterface $connection Connection to the master node.
* @param FactoryInterface $connectionFactory Connection factory instance.
*/
protected function discoverFromMaster(NodeConnectionInterface $connection, FactoryInterface $connectionFactory)
{
$response = $connection->executeCommand(RawCommand::create('INFO', 'REPLICATION'));
$replication = $this->handleInfoResponse($response);
if ($replication['role'] !== 'master') {
throw new ClientException("Role mismatch (expected master, got slave) [$connection]");
}
$this->slaves = array();
foreach ($replication as $k => $v) {
$parameters = null;
if (strpos($k, 'slave') === 0 && preg_match('/ip=(?P<host>.*),port=(?P<port>\d+)/', $v, $parameters)) {
$slaveConnection = $connectionFactory->create(array(
'host' => $parameters['host'],
'port' => $parameters['port'],
));
$this->add($slaveConnection);
}
}
}
/**
* Discovers the replication configuration by contacting one of the slaves.
*
* @param NodeConnectionInterface $connection Connection to one of the slaves.
* @param FactoryInterface $connectionFactory Connection factory instance.
*/
protected function discoverFromSlave(NodeConnectionInterface $connection, FactoryInterface $connectionFactory)
{
$response = $connection->executeCommand(RawCommand::create('INFO', 'REPLICATION'));
$replication = $this->handleInfoResponse($response);
if ($replication['role'] !== 'slave') {
throw new ClientException("Role mismatch (expected slave, got master) [$connection]");
}
$masterConnection = $connectionFactory->create(array(
'host' => $replication['master_host'],
'port' => $replication['master_port'],
'alias' => 'master',
));
$this->add($masterConnection);
$this->discoverFromMaster($masterConnection, $connectionFactory);
}
/**
* Retries the execution of a command upon slave failure.
*
* @param CommandInterface $command Command instance.
* @param string $method Actual method.
*
* @return mixed
*/
private function retryCommandOnFailure(CommandInterface $command, $method)
{
RETRY_COMMAND: {
try {
$connection = $this->getConnection($command);
$response = $connection->$method($command);
if ($response instanceof ResponseErrorInterface && $response->getErrorType() === 'LOADING') {
throw new ConnectionException($connection, "Redis is loading the dataset in memory [$connection]");
}
} catch (ConnectionException $exception) {
$connection = $exception->getConnection();
$connection->disconnect();
if ($connection === $this->master && !$this->autoDiscovery) {
// Throw immediately when master connection is failing, even
// when the command represents a read-only operation, unless
// automatic discovery has been enabled.
throw $exception;
} else {
// Otherwise remove the failing slave and attempt to execute
// the command again on one of the remaining slaves...
$this->remove($connection);
}
// ... that is, unless we have no more connections to use.
if (!$this->slaves && !$this->master) {
throw $exception;
} elseif ($this->autoDiscovery) {
$this->discover();
}
goto RETRY_COMMAND;
} catch (MissingMasterException $exception) {
if ($this->autoDiscovery) {
$this->discover();
} else {
throw $exception;
}
goto RETRY_COMMAND;
}
}
return $response;
}
/**
* {@inheritdoc}
*/
public function writeRequest(CommandInterface $command)
{
$this->retryCommandOnFailure($command, __FUNCTION__);
}
/**
* {@inheritdoc}
*/
public function readResponse(CommandInterface $command)
{
return $this->retryCommandOnFailure($command, __FUNCTION__);
}
/**
* {@inheritdoc}
*/
public function executeCommand(CommandInterface $command)
{
return $this->retryCommandOnFailure($command, __FUNCTION__);
}
/**
* {@inheritdoc}
*/
public function __sleep()
{
return array('master', 'slaves', 'strategy');
}
}

View File

@@ -0,0 +1,235 @@
<?php
/*
* This file is part of the Predis package.
*
* (c) Daniele Alessandri <suppakilla@gmail.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Predis\Connection\Aggregate;
use Predis\Cluster\PredisStrategy;
use Predis\Cluster\StrategyInterface;
use Predis\Command\CommandInterface;
use Predis\Connection\NodeConnectionInterface;
use Predis\NotSupportedException;
/**
* Abstraction for a cluster of aggregate connections to various Redis servers
* implementing client-side sharding based on pluggable distribution strategies.
*
* @author Daniele Alessandri <suppakilla@gmail.com>
*
* @todo Add the ability to remove connections from pool.
*/
class PredisCluster implements ClusterInterface, \IteratorAggregate, \Countable
{
private $pool;
private $strategy;
private $distributor;
/**
* @param StrategyInterface $strategy Optional cluster strategy.
*/
public function __construct(StrategyInterface $strategy = null)
{
$this->pool = array();
$this->strategy = $strategy ?: new PredisStrategy();
$this->distributor = $this->strategy->getDistributor();
}
/**
* {@inheritdoc}
*/
public function isConnected()
{
foreach ($this->pool as $connection) {
if ($connection->isConnected()) {
return true;
}
}
return false;
}
/**
* {@inheritdoc}
*/
public function connect()
{
foreach ($this->pool as $connection) {
$connection->connect();
}
}
/**
* {@inheritdoc}
*/
public function disconnect()
{
foreach ($this->pool as $connection) {
$connection->disconnect();
}
}
/**
* {@inheritdoc}
*/
public function add(NodeConnectionInterface $connection)
{
$parameters = $connection->getParameters();
if (isset($parameters->alias)) {
$this->pool[$parameters->alias] = $connection;
} else {
$this->pool[] = $connection;
}
$weight = isset($parameters->weight) ? $parameters->weight : null;
$this->distributor->add($connection, $weight);
}
/**
* {@inheritdoc}
*/
public function remove(NodeConnectionInterface $connection)
{
if (($id = array_search($connection, $this->pool, true)) !== false) {
unset($this->pool[$id]);
$this->distributor->remove($connection);
return true;
}
return false;
}
/**
* Removes a connection instance using its alias or index.
*
* @param string $connectionID Alias or index of a connection.
*
* @return bool Returns true if the connection was in the pool.
*/
public function removeById($connectionID)
{
if ($connection = $this->getConnectionById($connectionID)) {
return $this->remove($connection);
}
return false;
}
/**
* {@inheritdoc}
*/
public function getConnection(CommandInterface $command)
{
$slot = $this->strategy->getSlot($command);
if (!isset($slot)) {
throw new NotSupportedException(
"Cannot use '{$command->getId()}' over clusters of connections."
);
}
$node = $this->distributor->getBySlot($slot);
return $node;
}
/**
* {@inheritdoc}
*/
public function getConnectionById($connectionID)
{
return isset($this->pool[$connectionID]) ? $this->pool[$connectionID] : null;
}
/**
* Retrieves a connection instance from the cluster using a key.
*
* @param string $key Key string.
*
* @return NodeConnectionInterface
*/
public function getConnectionByKey($key)
{
$hash = $this->strategy->getSlotByKey($key);
$node = $this->distributor->getBySlot($hash);
return $node;
}
/**
* Returns the underlying command hash strategy used to hash commands by
* using keys found in their arguments.
*
* @return StrategyInterface
*/
public function getClusterStrategy()
{
return $this->strategy;
}
/**
* {@inheritdoc}
*/
public function count()
{
return count($this->pool);
}
/**
* {@inheritdoc}
*/
public function getIterator()
{
return new \ArrayIterator($this->pool);
}
/**
* {@inheritdoc}
*/
public function writeRequest(CommandInterface $command)
{
$this->getConnection($command)->writeRequest($command);
}
/**
* {@inheritdoc}
*/
public function readResponse(CommandInterface $command)
{
return $this->getConnection($command)->readResponse($command);
}
/**
* {@inheritdoc}
*/
public function executeCommand(CommandInterface $command)
{
return $this->getConnection($command)->executeCommand($command);
}
/**
* Executes the specified Redis command on all the nodes of a cluster.
*
* @param CommandInterface $command A Redis command.
*
* @return array
*/
public function executeCommandOnNodes(CommandInterface $command)
{
$responses = array();
foreach ($this->pool as $connection) {
$responses[] = $connection->executeCommand($command);
}
return $responses;
}
}

View File

@@ -0,0 +1,673 @@
<?php
/*
* This file is part of the Predis package.
*
* (c) Daniele Alessandri <suppakilla@gmail.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Predis\Connection\Aggregate;
use Predis\ClientException;
use Predis\Cluster\RedisStrategy as RedisClusterStrategy;
use Predis\Cluster\StrategyInterface;
use Predis\Command\CommandInterface;
use Predis\Command\RawCommand;
use Predis\Connection\ConnectionException;
use Predis\Connection\FactoryInterface;
use Predis\Connection\NodeConnectionInterface;
use Predis\NotSupportedException;
use Predis\Response\ErrorInterface as ErrorResponseInterface;
/**
* Abstraction for a Redis-backed cluster of nodes (Redis >= 3.0.0).
*
* This connection backend offers smart support for redis-cluster by handling
* automatic slots map (re)generation upon -MOVED or -ASK responses returned by
* Redis when redirecting a client to a different node.
*
* The cluster can be pre-initialized using only a subset of the actual nodes in
* the cluster, Predis will do the rest by adjusting the slots map and creating
* the missing underlying connection instances on the fly.
*
* It is possible to pre-associate connections to a slots range with the "slots"
* parameter in the form "$first-$last". This can greatly reduce runtime node
* guessing and redirections.
*
* It is also possible to ask for the full and updated slots map directly to one
* of the nodes and optionally enable such a behaviour upon -MOVED redirections.
* Asking for the cluster configuration to Redis is actually done by issuing a
* CLUSTER SLOTS command to a random node in the pool.
*
* @author Daniele Alessandri <suppakilla@gmail.com>
*/
class RedisCluster implements ClusterInterface, \IteratorAggregate, \Countable
{
private $useClusterSlots = true;
private $pool = array();
private $slots = array();
private $slotsMap;
private $strategy;
private $connections;
private $retryLimit = 5;
/**
* @param FactoryInterface $connections Optional connection factory.
* @param StrategyInterface $strategy Optional cluster strategy.
*/
public function __construct(
FactoryInterface $connections,
StrategyInterface $strategy = null
) {
$this->connections = $connections;
$this->strategy = $strategy ?: new RedisClusterStrategy();
}
/**
* Sets the maximum number of retries for commands upon server failure.
*
* -1 = unlimited retry attempts
* 0 = no retry attempts (fails immediatly)
* n = fail only after n retry attempts
*
* @param int $retry Number of retry attempts.
*/
public function setRetryLimit($retry)
{
$this->retryLimit = (int) $retry;
}
/**
* {@inheritdoc}
*/
public function isConnected()
{
foreach ($this->pool as $connection) {
if ($connection->isConnected()) {
return true;
}
}
return false;
}
/**
* {@inheritdoc}
*/
public function connect()
{
if ($connection = $this->getRandomConnection()) {
$connection->connect();
}
}
/**
* {@inheritdoc}
*/
public function disconnect()
{
foreach ($this->pool as $connection) {
$connection->disconnect();
}
}
/**
* {@inheritdoc}
*/
public function add(NodeConnectionInterface $connection)
{
$this->pool[(string) $connection] = $connection;
unset($this->slotsMap);
}
/**
* {@inheritdoc}
*/
public function remove(NodeConnectionInterface $connection)
{
if (false !== $id = array_search($connection, $this->pool, true)) {
unset(
$this->pool[$id],
$this->slotsMap
);
$this->slots = array_diff($this->slots, array($connection));
return true;
}
return false;
}
/**
* Removes a connection instance by using its identifier.
*
* @param string $connectionID Connection identifier.
*
* @return bool True if the connection was in the pool.
*/
public function removeById($connectionID)
{
if (isset($this->pool[$connectionID])) {
unset(
$this->pool[$connectionID],
$this->slotsMap
);
return true;
}
return false;
}
/**
* Generates the current slots map by guessing the cluster configuration out
* of the connection parameters of the connections in the pool.
*
* Generation is based on the same algorithm used by Redis to generate the
* cluster, so it is most effective when all of the connections supplied on
* initialization have the "slots" parameter properly set accordingly to the
* current cluster configuration.
*
* @return array
*/
public function buildSlotsMap()
{
$this->slotsMap = array();
foreach ($this->pool as $connectionID => $connection) {
$parameters = $connection->getParameters();
if (!isset($parameters->slots)) {
continue;
}
foreach (explode(',', $parameters->slots) as $slotRange) {
$slots = explode('-', $slotRange, 2);
if (!isset($slots[1])) {
$slots[1] = $slots[0];
}
$this->setSlots($slots[0], $slots[1], $connectionID);
}
}
return $this->slotsMap;
}
/**
* Queries the specified node of the cluster to fetch the updated slots map.
*
* When the connection fails, this method tries to execute the same command
* on a different connection picked at random from the pool of known nodes,
* up until the retry limit is reached.
*
* @param NodeConnectionInterface $connection Connection to a node of the cluster.
*
* @return mixed
*/
private function queryClusterNodeForSlotsMap(NodeConnectionInterface $connection)
{
$retries = 0;
$command = RawCommand::create('CLUSTER', 'SLOTS');
RETRY_COMMAND: {
try {
$response = $connection->executeCommand($command);
} catch (ConnectionException $exception) {
$connection = $exception->getConnection();
$connection->disconnect();
$this->remove($connection);
if ($retries === $this->retryLimit) {
throw $exception;
}
if (!$connection = $this->getRandomConnection()) {
throw new ClientException('No connections left in the pool for `CLUSTER SLOTS`');
}
++$retries;
goto RETRY_COMMAND;
}
}
return $response;
}
/**
* Generates an updated slots map fetching the cluster configuration using
* the CLUSTER SLOTS command against the specified node or a random one from
* the pool.
*
* @param NodeConnectionInterface $connection Optional connection instance.
*
* @return array
*/
public function askSlotsMap(NodeConnectionInterface $connection = null)
{
if (!$connection && !$connection = $this->getRandomConnection()) {
return array();
}
$this->resetSlotsMap();
$response = $this->queryClusterNodeForSlotsMap($connection);
foreach ($response as $slots) {
// We only support master servers for now, so we ignore subsequent
// elements in the $slots array identifying slaves.
list($start, $end, $master) = $slots;
if ($master[0] === '') {
$this->setSlots($start, $end, (string) $connection);
} else {
$this->setSlots($start, $end, "{$master[0]}:{$master[1]}");
}
}
return $this->slotsMap;
}
/**
* Resets the slots map cache.
*/
public function resetSlotsMap()
{
$this->slotsMap = array();
}
/**
* Returns the current slots map for the cluster.
*
* The order of the returned $slot => $server dictionary is not guaranteed.
*
* @return array
*/
public function getSlotsMap()
{
if (!isset($this->slotsMap)) {
$this->slotsMap = array();
}
return $this->slotsMap;
}
/**
* Pre-associates a connection to a slots range to avoid runtime guessing.
*
* @param int $first Initial slot of the range.
* @param int $last Last slot of the range.
* @param NodeConnectionInterface|string $connection ID or connection instance.
*
* @throws \OutOfBoundsException
*/
public function setSlots($first, $last, $connection)
{
if ($first < 0x0000 || $first > 0x3FFF ||
$last < 0x0000 || $last > 0x3FFF ||
$last < $first
) {
throw new \OutOfBoundsException(
"Invalid slot range for $connection: [$first-$last]."
);
}
$slots = array_fill($first, $last - $first + 1, (string) $connection);
$this->slotsMap = $this->getSlotsMap() + $slots;
}
/**
* Guesses the correct node associated to a given slot using a precalculated
* slots map, falling back to the same logic used by Redis to initialize a
* cluster (best-effort).
*
* @param int $slot Slot index.
*
* @return string Connection ID.
*/
protected function guessNode($slot)
{
if (!$this->pool) {
throw new ClientException('No connections available in the pool');
}
if (!isset($this->slotsMap)) {
$this->buildSlotsMap();
}
if (isset($this->slotsMap[$slot])) {
return $this->slotsMap[$slot];
}
$count = count($this->pool);
$index = min((int) ($slot / (int) (16384 / $count)), $count - 1);
$nodes = array_keys($this->pool);
return $nodes[$index];
}
/**
* Creates a new connection instance from the given connection ID.
*
* @param string $connectionID Identifier for the connection.
*
* @return NodeConnectionInterface
*/
protected function createConnection($connectionID)
{
$separator = strrpos($connectionID, ':');
return $this->connections->create(array(
'host' => substr($connectionID, 0, $separator),
'port' => substr($connectionID, $separator + 1),
));
}
/**
* {@inheritdoc}
*/
public function getConnection(CommandInterface $command)
{
$slot = $this->strategy->getSlot($command);
if (!isset($slot)) {
throw new NotSupportedException(
"Cannot use '{$command->getId()}' with redis-cluster."
);
}
if (isset($this->slots[$slot])) {
return $this->slots[$slot];
} else {
return $this->getConnectionBySlot($slot);
}
}
/**
* Returns the connection currently associated to a given slot.
*
* @param int $slot Slot index.
*
* @throws \OutOfBoundsException
*
* @return NodeConnectionInterface
*/
public function getConnectionBySlot($slot)
{
if ($slot < 0x0000 || $slot > 0x3FFF) {
throw new \OutOfBoundsException("Invalid slot [$slot].");
}
if (isset($this->slots[$slot])) {
return $this->slots[$slot];
}
$connectionID = $this->guessNode($slot);
if (!$connection = $this->getConnectionById($connectionID)) {
$connection = $this->createConnection($connectionID);
$this->pool[$connectionID] = $connection;
}
return $this->slots[$slot] = $connection;
}
/**
* {@inheritdoc}
*/
public function getConnectionById($connectionID)
{
if (isset($this->pool[$connectionID])) {
return $this->pool[$connectionID];
}
}
/**
* Returns a random connection from the pool.
*
* @return NodeConnectionInterface|null
*/
protected function getRandomConnection()
{
if ($this->pool) {
return $this->pool[array_rand($this->pool)];
}
}
/**
* Permanently associates the connection instance to a new slot.
* The connection is added to the connections pool if not yet included.
*
* @param NodeConnectionInterface $connection Connection instance.
* @param int $slot Target slot index.
*/
protected function move(NodeConnectionInterface $connection, $slot)
{
$this->pool[(string) $connection] = $connection;
$this->slots[(int) $slot] = $connection;
}
/**
* Handles -ERR responses returned by Redis.
*
* @param CommandInterface $command Command that generated the -ERR response.
* @param ErrorResponseInterface $error Redis error response object.
*
* @return mixed
*/
protected function onErrorResponse(CommandInterface $command, ErrorResponseInterface $error)
{
$details = explode(' ', $error->getMessage(), 2);
switch ($details[0]) {
case 'MOVED':
return $this->onMovedResponse($command, $details[1]);
case 'ASK':
return $this->onAskResponse($command, $details[1]);
default:
return $error;
}
}
/**
* Handles -MOVED responses by executing again the command against the node
* indicated by the Redis response.
*
* @param CommandInterface $command Command that generated the -MOVED response.
* @param string $details Parameters of the -MOVED response.
*
* @return mixed
*/
protected function onMovedResponse(CommandInterface $command, $details)
{
list($slot, $connectionID) = explode(' ', $details, 2);
if (!$connection = $this->getConnectionById($connectionID)) {
$connection = $this->createConnection($connectionID);
}
if ($this->useClusterSlots) {
$this->askSlotsMap($connection);
}
$this->move($connection, $slot);
$response = $this->executeCommand($command);
return $response;
}
/**
* Handles -ASK responses by executing again the command against the node
* indicated by the Redis response.
*
* @param CommandInterface $command Command that generated the -ASK response.
* @param string $details Parameters of the -ASK response.
*
* @return mixed
*/
protected function onAskResponse(CommandInterface $command, $details)
{
list($slot, $connectionID) = explode(' ', $details, 2);
if (!$connection = $this->getConnectionById($connectionID)) {
$connection = $this->createConnection($connectionID);
}
$connection->executeCommand(RawCommand::create('ASKING'));
$response = $connection->executeCommand($command);
return $response;
}
/**
* Ensures that a command is executed one more time on connection failure.
*
* The connection to the node that generated the error is evicted from the
* pool before trying to fetch an updated slots map from another node. If
* the new slots map points to an unreachable server the client gives up and
* throws the exception as the nodes participating in the cluster may still
* have to agree that something changed in the configuration of the cluster.
*
* @param CommandInterface $command Command instance.
* @param string $method Actual method.
*
* @return mixed
*/
private function retryCommandOnFailure(CommandInterface $command, $method)
{
$failure = false;
RETRY_COMMAND: {
try {
$response = $this->getConnection($command)->$method($command);
} catch (ConnectionException $exception) {
$connection = $exception->getConnection();
$connection->disconnect();
$this->remove($connection);
if ($failure) {
throw $exception;
} elseif ($this->useClusterSlots) {
$this->askSlotsMap();
}
$failure = true;
goto RETRY_COMMAND;
}
}
return $response;
}
/**
* {@inheritdoc}
*/
public function writeRequest(CommandInterface $command)
{
$this->retryCommandOnFailure($command, __FUNCTION__);
}
/**
* {@inheritdoc}
*/
public function readResponse(CommandInterface $command)
{
return $this->retryCommandOnFailure($command, __FUNCTION__);
}
/**
* {@inheritdoc}
*/
public function executeCommand(CommandInterface $command)
{
$response = $this->retryCommandOnFailure($command, __FUNCTION__);
if ($response instanceof ErrorResponseInterface) {
return $this->onErrorResponse($command, $response);
}
return $response;
}
/**
* {@inheritdoc}
*/
public function count()
{
return count($this->pool);
}
/**
* {@inheritdoc}
*/
public function getIterator()
{
if ($this->useClusterSlots) {
$slotsmap = $this->getSlotsMap() ?: $this->askSlotsMap();
} else {
$slotsmap = $this->getSlotsMap() ?: $this->buildSlotsMap();
}
$connections = array();
foreach (array_unique($slotsmap) as $node) {
if (!$connection = $this->getConnectionById($node)) {
$this->add($connection = $this->createConnection($node));
}
$connections[] = $connection;
}
return new \ArrayIterator($connections);
}
/**
* Returns the underlying command hash strategy used to hash commands by
* using keys found in their arguments.
*
* @return StrategyInterface
*/
public function getClusterStrategy()
{
return $this->strategy;
}
/**
* Returns the underlying connection factory used to create new connection
* instances to Redis nodes indicated by redis-cluster.
*
* @return FactoryInterface
*/
public function getConnectionFactory()
{
return $this->connections;
}
/**
* Enables automatic fetching of the current slots map from one of the nodes
* using the CLUSTER SLOTS command. This option is enabled by default as
* asking the current slots map to Redis upon -MOVED responses may reduce
* overhead by eliminating the trial-and-error nature of the node guessing
* procedure, mostly when targeting many keys that would end up in a lot of
* redirections.
*
* The slots map can still be manually fetched using the askSlotsMap()
* method whether or not this option is enabled.
*
* @param bool $value Enable or disable the use of CLUSTER SLOTS.
*/
public function useClusterSlots($value)
{
$this->useClusterSlots = (bool) $value;
}
}

View File

@@ -0,0 +1,52 @@
<?php
/*
* This file is part of the Predis package.
*
* (c) Daniele Alessandri <suppakilla@gmail.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Predis\Connection\Aggregate;
use Predis\Connection\AggregateConnectionInterface;
use Predis\Connection\NodeConnectionInterface;
/**
* Defines a group of Redis nodes in a master / slave replication setup.
*
* @author Daniele Alessandri <suppakilla@gmail.com>
*/
interface ReplicationInterface extends AggregateConnectionInterface
{
/**
* Switches the internal connection instance in use.
*
* @param string $connection Alias of a connection
*/
public function switchTo($connection);
/**
* Returns the connection instance currently in use by the aggregate
* connection.
*
* @return NodeConnectionInterface
*/
public function getCurrent();
/**
* Returns the connection instance for the master Redis node.
*
* @return NodeConnectionInterface
*/
public function getMaster();
/**
* Returns a list of connection instances to slave nodes.
*
* @return NodeConnectionInterface
*/
public function getSlaves();
}

View File

@@ -0,0 +1,720 @@
<?php
/*
* This file is part of the Predis package.
*
* (c) Daniele Alessandri <suppakilla@gmail.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Predis\Connection\Aggregate;
use Predis\Command\CommandInterface;
use Predis\Command\RawCommand;
use Predis\CommunicationException;
use Predis\Connection\ConnectionException;
use Predis\Connection\FactoryInterface as ConnectionFactoryInterface;
use Predis\Connection\NodeConnectionInterface;
use Predis\Connection\Parameters;
use Predis\Replication\ReplicationStrategy;
use Predis\Replication\RoleException;
use Predis\Response\ErrorInterface as ErrorResponseInterface;
use Predis\Response\ServerException;
/**
* @author Daniele Alessandri <suppakilla@gmail.com>
* @author Ville Mattila <ville@eventio.fi>
*/
class SentinelReplication implements ReplicationInterface
{
/**
* @var NodeConnectionInterface
*/
protected $master;
/**
* @var NodeConnectionInterface[]
*/
protected $slaves = array();
/**
* @var NodeConnectionInterface
*/
protected $current;
/**
* @var string
*/
protected $service;
/**
* @var ConnectionFactoryInterface
*/
protected $connectionFactory;
/**
* @var ReplicationStrategy
*/
protected $strategy;
/**
* @var NodeConnectionInterface[]
*/
protected $sentinels = array();
/**
* @var NodeConnectionInterface
*/
protected $sentinelConnection;
/**
* @var float
*/
protected $sentinelTimeout = 0.100;
/**
* Max number of automatic retries of commands upon server failure.
*
* -1 = unlimited retry attempts
* 0 = no retry attempts (fails immediatly)
* n = fail only after n retry attempts
*
* @var int
*/
protected $retryLimit = 20;
/**
* Time to wait in milliseconds before fetching a new configuration from one
* of the sentinel servers.
*
* @var int
*/
protected $retryWait = 1000;
/**
* Flag for automatic fetching of available sentinels.
*
* @var bool
*/
protected $updateSentinels = false;
/**
* @param string $service Name of the service for autodiscovery.
* @param array $sentinels Sentinel servers connection parameters.
* @param ConnectionFactoryInterface $connectionFactory Connection factory instance.
* @param ReplicationStrategy $strategy Replication strategy instance.
*/
public function __construct(
$service,
array $sentinels,
ConnectionFactoryInterface $connectionFactory,
ReplicationStrategy $strategy = null
) {
$this->sentinels = $sentinels;
$this->service = $service;
$this->connectionFactory = $connectionFactory;
$this->strategy = $strategy ?: new ReplicationStrategy();
}
/**
* Sets a default timeout for connections to sentinels.
*
* When "timeout" is present in the connection parameters of sentinels, its
* value overrides the default sentinel timeout.
*
* @param float $timeout Timeout value.
*/
public function setSentinelTimeout($timeout)
{
$this->sentinelTimeout = (float) $timeout;
}
/**
* Sets the maximum number of retries for commands upon server failure.
*
* -1 = unlimited retry attempts
* 0 = no retry attempts (fails immediatly)
* n = fail only after n retry attempts
*
* @param int $retry Number of retry attempts.
*/
public function setRetryLimit($retry)
{
$this->retryLimit = (int) $retry;
}
/**
* Sets the time to wait (in seconds) before fetching a new configuration
* from one of the sentinels.
*
* @param float $seconds Time to wait before the next attempt.
*/
public function setRetryWait($seconds)
{
$this->retryWait = (float) $seconds;
}
/**
* Set automatic fetching of available sentinels.
*
* @param bool $update Enable or disable automatic updates.
*/
public function setUpdateSentinels($update)
{
$this->updateSentinels = (bool) $update;
}
/**
* Resets the current connection.
*/
protected function reset()
{
$this->current = null;
}
/**
* Wipes the current list of master and slaves nodes.
*/
protected function wipeServerList()
{
$this->reset();
$this->master = null;
$this->slaves = array();
}
/**
* {@inheritdoc}
*/
public function add(NodeConnectionInterface $connection)
{
$alias = $connection->getParameters()->alias;
if ($alias === 'master') {
$this->master = $connection;
} else {
$this->slaves[$alias ?: count($this->slaves)] = $connection;
}
$this->reset();
}
/**
* {@inheritdoc}
*/
public function remove(NodeConnectionInterface $connection)
{
if ($connection === $this->master) {
$this->master = null;
$this->reset();
return true;
}
if (false !== $id = array_search($connection, $this->slaves, true)) {
unset($this->slaves[$id]);
$this->reset();
return true;
}
return false;
}
/**
* Creates a new connection to a sentinel server.
*
* @return NodeConnectionInterface
*/
protected function createSentinelConnection($parameters)
{
if ($parameters instanceof NodeConnectionInterface) {
return $parameters;
}
if (is_string($parameters)) {
$parameters = Parameters::parse($parameters);
}
if (is_array($parameters)) {
// We explicitly set "database" and "password" to null,
// so that no AUTH and SELECT command is send to the sentinels.
$parameters['database'] = null;
$parameters['password'] = null;
if (!isset($parameters['timeout'])) {
$parameters['timeout'] = $this->sentinelTimeout;
}
}
$connection = $this->connectionFactory->create($parameters);
return $connection;
}
/**
* Returns the current sentinel connection.
*
* If there is no active sentinel connection, a new connection is created.
*
* @return NodeConnectionInterface
*/
public function getSentinelConnection()
{
if (!$this->sentinelConnection) {
if (!$this->sentinels) {
throw new \Predis\ClientException('No sentinel server available for autodiscovery.');
}
$sentinel = array_shift($this->sentinels);
$this->sentinelConnection = $this->createSentinelConnection($sentinel);
}
return $this->sentinelConnection;
}
/**
* Fetches an updated list of sentinels from a sentinel.
*/
public function updateSentinels()
{
SENTINEL_QUERY: {
$sentinel = $this->getSentinelConnection();
try {
$payload = $sentinel->executeCommand(
RawCommand::create('SENTINEL', 'sentinels', $this->service)
);
$this->sentinels = array();
// NOTE: sentinel server does not return itself, so we add it back.
$this->sentinels[] = $sentinel->getParameters()->toArray();
foreach ($payload as $sentinel) {
$this->sentinels[] = array(
'host' => $sentinel[3],
'port' => $sentinel[5],
);
}
} catch (ConnectionException $exception) {
$this->sentinelConnection = null;
goto SENTINEL_QUERY;
}
}
}
/**
* Fetches the details for the master and slave servers from a sentinel.
*/
public function querySentinel()
{
$this->wipeServerList();
$this->updateSentinels();
$this->getMaster();
$this->getSlaves();
}
/**
* Handles error responses returned by redis-sentinel.
*
* @param NodeConnectionInterface $sentinel Connection to a sentinel server.
* @param ErrorResponseInterface $error Error response.
*/
private function handleSentinelErrorResponse(NodeConnectionInterface $sentinel, ErrorResponseInterface $error)
{
if ($error->getErrorType() === 'IDONTKNOW') {
throw new ConnectionException($sentinel, $error->getMessage());
} else {
throw new ServerException($error->getMessage());
}
}
/**
* Fetches the details for the master server from a sentinel.
*
* @param NodeConnectionInterface $sentinel Connection to a sentinel server.
* @param string $service Name of the service.
*
* @return array
*/
protected function querySentinelForMaster(NodeConnectionInterface $sentinel, $service)
{
$payload = $sentinel->executeCommand(
RawCommand::create('SENTINEL', 'get-master-addr-by-name', $service)
);
if ($payload === null) {
throw new ServerException('ERR No such master with that name');
}
if ($payload instanceof ErrorResponseInterface) {
$this->handleSentinelErrorResponse($sentinel, $payload);
}
return array(
'host' => $payload[0],
'port' => $payload[1],
'alias' => 'master',
);
}
/**
* Fetches the details for the slave servers from a sentinel.
*
* @param NodeConnectionInterface $sentinel Connection to a sentinel server.
* @param string $service Name of the service.
*
* @return array
*/
protected function querySentinelForSlaves(NodeConnectionInterface $sentinel, $service)
{
$slaves = array();
$payload = $sentinel->executeCommand(
RawCommand::create('SENTINEL', 'slaves', $service)
);
if ($payload instanceof ErrorResponseInterface) {
$this->handleSentinelErrorResponse($sentinel, $payload);
}
foreach ($payload as $slave) {
$flags = explode(',', $slave[9]);
if (array_intersect($flags, array('s_down', 'o_down', 'disconnected'))) {
continue;
}
$slaves[] = array(
'host' => $slave[3],
'port' => $slave[5],
'alias' => "slave-$slave[1]",
);
}
return $slaves;
}
/**
* {@inheritdoc}
*/
public function getCurrent()
{
return $this->current;
}
/**
* {@inheritdoc}
*/
public function getMaster()
{
if ($this->master) {
return $this->master;
}
if ($this->updateSentinels) {
$this->updateSentinels();
}
SENTINEL_QUERY: {
$sentinel = $this->getSentinelConnection();
try {
$masterParameters = $this->querySentinelForMaster($sentinel, $this->service);
$masterConnection = $this->connectionFactory->create($masterParameters);
$this->add($masterConnection);
} catch (ConnectionException $exception) {
$this->sentinelConnection = null;
goto SENTINEL_QUERY;
}
}
return $masterConnection;
}
/**
* {@inheritdoc}
*/
public function getSlaves()
{
if ($this->slaves) {
return array_values($this->slaves);
}
if ($this->updateSentinels) {
$this->updateSentinels();
}
SENTINEL_QUERY: {
$sentinel = $this->getSentinelConnection();
try {
$slavesParameters = $this->querySentinelForSlaves($sentinel, $this->service);
foreach ($slavesParameters as $slaveParameters) {
$this->add($this->connectionFactory->create($slaveParameters));
}
} catch (ConnectionException $exception) {
$this->sentinelConnection = null;
goto SENTINEL_QUERY;
}
}
return array_values($this->slaves ?: array());
}
/**
* Returns a random slave.
*
* @return NodeConnectionInterface
*/
protected function pickSlave()
{
if ($slaves = $this->getSlaves()) {
return $slaves[rand(1, count($slaves)) - 1];
}
}
/**
* Returns the connection instance in charge for the given command.
*
* @param CommandInterface $command Command instance.
*
* @return NodeConnectionInterface
*/
private function getConnectionInternal(CommandInterface $command)
{
if (!$this->current) {
if ($this->strategy->isReadOperation($command) && $slave = $this->pickSlave()) {
$this->current = $slave;
} else {
$this->current = $this->getMaster();
}
return $this->current;
}
if ($this->current === $this->master) {
return $this->current;
}
if (!$this->strategy->isReadOperation($command)) {
$this->current = $this->getMaster();
}
return $this->current;
}
/**
* Asserts that the specified connection matches an expected role.
*
* @param NodeConnectionInterface $sentinel Connection to a redis server.
* @param string $role Expected role of the server ("master", "slave" or "sentinel").
*/
protected function assertConnectionRole(NodeConnectionInterface $connection, $role)
{
$role = strtolower($role);
$actualRole = $connection->executeCommand(RawCommand::create('ROLE'));
if ($role !== $actualRole[0]) {
throw new RoleException($connection, "Expected $role but got $actualRole[0] [$connection]");
}
}
/**
* {@inheritdoc}
*/
public function getConnection(CommandInterface $command)
{
$connection = $this->getConnectionInternal($command);
if (!$connection->isConnected()) {
// When we do not have any available slave in the pool we can expect
// read-only operations to hit the master server.
$expectedRole = $this->strategy->isReadOperation($command) && $this->slaves ? 'slave' : 'master';
$this->assertConnectionRole($connection, $expectedRole);
}
return $connection;
}
/**
* {@inheritdoc}
*/
public function getConnectionById($connectionId)
{
if ($connectionId === 'master') {
return $this->getMaster();
}
$this->getSlaves();
if (isset($this->slaves[$connectionId])) {
return $this->slaves[$connectionId];
}
}
/**
* {@inheritdoc}
*/
public function switchTo($connection)
{
if (!$connection instanceof NodeConnectionInterface) {
$connection = $this->getConnectionById($connection);
}
if ($connection && $connection === $this->current) {
return;
}
if ($connection !== $this->master && !in_array($connection, $this->slaves, true)) {
throw new \InvalidArgumentException('Invalid connection or connection not found.');
}
$connection->connect();
if ($this->current) {
$this->current->disconnect();
}
$this->current = $connection;
}
/**
* Switches to the master server.
*/
public function switchToMaster()
{
$this->switchTo('master');
}
/**
* Switches to a random slave server.
*/
public function switchToSlave()
{
$connection = $this->pickSlave();
$this->switchTo($connection);
}
/**
* {@inheritdoc}
*/
public function isConnected()
{
return $this->current ? $this->current->isConnected() : false;
}
/**
* {@inheritdoc}
*/
public function connect()
{
if (!$this->current) {
if (!$this->current = $this->pickSlave()) {
$this->current = $this->getMaster();
}
}
$this->current->connect();
}
/**
* {@inheritdoc}
*/
public function disconnect()
{
if ($this->master) {
$this->master->disconnect();
}
foreach ($this->slaves as $connection) {
$connection->disconnect();
}
}
/**
* Retries the execution of a command upon server failure after asking a new
* configuration to one of the sentinels.
*
* @param CommandInterface $command Command instance.
* @param string $method Actual method.
*
* @return mixed
*/
private function retryCommandOnFailure(CommandInterface $command, $method)
{
$retries = 0;
SENTINEL_RETRY: {
try {
$response = $this->getConnection($command)->$method($command);
} catch (CommunicationException $exception) {
$this->wipeServerList();
$exception->getConnection()->disconnect();
if ($retries == $this->retryLimit) {
throw $exception;
}
usleep($this->retryWait * 1000);
++$retries;
goto SENTINEL_RETRY;
}
}
return $response;
}
/**
* {@inheritdoc}
*/
public function writeRequest(CommandInterface $command)
{
$this->retryCommandOnFailure($command, __FUNCTION__);
}
/**
* {@inheritdoc}
*/
public function readResponse(CommandInterface $command)
{
return $this->retryCommandOnFailure($command, __FUNCTION__);
}
/**
* {@inheritdoc}
*/
public function executeCommand(CommandInterface $command)
{
return $this->retryCommandOnFailure($command, __FUNCTION__);
}
/**
* Returns the underlying replication strategy.
*
* @return ReplicationStrategy
*/
public function getReplicationStrategy()
{
return $this->strategy;
}
/**
* {@inheritdoc}
*/
public function __sleep()
{
return array(
'master', 'slaves', 'service', 'sentinels', 'connectionFactory', 'strategy',
);
}
}

View File

@@ -0,0 +1,57 @@
<?php
/*
* This file is part of the Predis package.
*
* (c) Daniele Alessandri <suppakilla@gmail.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Predis\Connection;
use Predis\Command\CommandInterface;
/**
* Defines a virtual connection composed of multiple connection instances to
* single Redis nodes.
*
* @author Daniele Alessandri <suppakilla@gmail.com>
*/
interface AggregateConnectionInterface extends ConnectionInterface
{
/**
* Adds a connection instance to the aggregate connection.
*
* @param NodeConnectionInterface $connection Connection instance.
*/
public function add(NodeConnectionInterface $connection);
/**
* Removes the specified connection instance from the aggregate connection.
*
* @param NodeConnectionInterface $connection Connection instance.
*
* @return bool Returns true if the connection was in the pool.
*/
public function remove(NodeConnectionInterface $connection);
/**
* Returns the connection instance in charge for the given command.
*
* @param CommandInterface $command Command instance.
*
* @return NodeConnectionInterface
*/
public function getConnection(CommandInterface $command);
/**
* Returns a connection instance from the aggregate connection by its alias.
*
* @param string $connectionID Connection alias.
*
* @return NodeConnectionInterface|null
*/
public function getConnectionById($connectionID);
}

View File

@@ -0,0 +1,49 @@
<?php
/*
* This file is part of the Predis package.
*
* (c) Daniele Alessandri <suppakilla@gmail.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Predis\Connection;
/**
* Defines a connection to communicate with a single Redis server that leverages
* an external protocol processor to handle pluggable protocol handlers.
*
* @author Daniele Alessandri <suppakilla@gmail.com>
*/
interface CompositeConnectionInterface extends NodeConnectionInterface
{
/**
* Returns the protocol processor used by the connection.
*/
public function getProtocol();
/**
* Writes the buffer containing over the connection.
*
* @param string $buffer String buffer to be sent over the connection.
*/
public function writeBuffer($buffer);
/**
* Reads the given number of bytes from the connection.
*
* @param int $length Number of bytes to read from the connection.
*
* @return string
*/
public function readBuffer($length);
/**
* Reads a line from the connection.
*
* @param string
*/
public function readLine();
}

View File

@@ -0,0 +1,125 @@
<?php
/*
* This file is part of the Predis package.
*
* (c) Daniele Alessandri <suppakilla@gmail.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Predis\Connection;
use Predis\Command\CommandInterface;
use Predis\Protocol\ProtocolProcessorInterface;
use Predis\Protocol\Text\ProtocolProcessor as TextProtocolProcessor;
/**
* Connection abstraction to Redis servers based on PHP's stream that uses an
* external protocol processor defining the protocol used for the communication.
*
* @author Daniele Alessandri <suppakilla@gmail.com>
*/
class CompositeStreamConnection extends StreamConnection implements CompositeConnectionInterface
{
protected $protocol;
/**
* @param ParametersInterface $parameters Initialization parameters for the connection.
* @param ProtocolProcessorInterface $protocol Protocol processor.
*/
public function __construct(
ParametersInterface $parameters,
ProtocolProcessorInterface $protocol = null
) {
$this->parameters = $this->assertParameters($parameters);
$this->protocol = $protocol ?: new TextProtocolProcessor();
}
/**
* {@inheritdoc}
*/
public function getProtocol()
{
return $this->protocol;
}
/**
* {@inheritdoc}
*/
public function writeBuffer($buffer)
{
$this->write($buffer);
}
/**
* {@inheritdoc}
*/
public function readBuffer($length)
{
if ($length <= 0) {
throw new \InvalidArgumentException('Length parameter must be greater than 0.');
}
$value = '';
$socket = $this->getResource();
do {
$chunk = fread($socket, $length);
if ($chunk === false || $chunk === '') {
$this->onConnectionError('Error while reading bytes from the server.');
}
$value .= $chunk;
} while (($length -= strlen($chunk)) > 0);
return $value;
}
/**
* {@inheritdoc}
*/
public function readLine()
{
$value = '';
$socket = $this->getResource();
do {
$chunk = fgets($socket);
if ($chunk === false || $chunk === '') {
$this->onConnectionError('Error while reading line from the server.');
}
$value .= $chunk;
} while (substr($value, -2) !== "\r\n");
return substr($value, 0, -2);
}
/**
* {@inheritdoc}
*/
public function writeRequest(CommandInterface $command)
{
$this->protocol->write($this, $command);
}
/**
* {@inheritdoc}
*/
public function read()
{
return $this->protocol->read($this);
}
/**
* {@inheritdoc}
*/
public function __sleep()
{
return array_merge(parent::__sleep(), array('protocol'));
}
}

View File

@@ -0,0 +1,23 @@
<?php
/*
* This file is part of the Predis package.
*
* (c) Daniele Alessandri <suppakilla@gmail.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Predis\Connection;
use Predis\CommunicationException;
/**
* Exception class that identifies connection-related errors.
*
* @author Daniele Alessandri <suppakilla@gmail.com>
*/
class ConnectionException extends CommunicationException
{
}

View File

@@ -0,0 +1,66 @@
<?php
/*
* This file is part of the Predis package.
*
* (c) Daniele Alessandri <suppakilla@gmail.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Predis\Connection;
use Predis\Command\CommandInterface;
/**
* Defines a connection object used to communicate with one or multiple
* Redis servers.
*
* @author Daniele Alessandri <suppakilla@gmail.com>
*/
interface ConnectionInterface
{
/**
* Opens the connection to Redis.
*/
public function connect();
/**
* Closes the connection to Redis.
*/
public function disconnect();
/**
* Checks if the connection to Redis is considered open.
*
* @return bool
*/
public function isConnected();
/**
* Writes the request for the given command over the connection.
*
* @param CommandInterface $command Command instance.
*/
public function writeRequest(CommandInterface $command);
/**
* Reads the response to the given command from the connection.
*
* @param CommandInterface $command Command instance.
*
* @return mixed
*/
public function readResponse(CommandInterface $command);
/**
* Writes a request for the given command over the connection and reads back
* the response returned by Redis.
*
* @param CommandInterface $command Command instance.
*
* @return mixed
*/
public function executeCommand(CommandInterface $command);
}

View File

@@ -0,0 +1,188 @@
<?php
/*
* This file is part of the Predis package.
*
* (c) Daniele Alessandri <suppakilla@gmail.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Predis\Connection;
use Predis\Command\RawCommand;
/**
* Standard connection factory for creating connections to Redis nodes.
*
* @author Daniele Alessandri <suppakilla@gmail.com>
*/
class Factory implements FactoryInterface
{
private $defaults = array();
protected $schemes = array(
'tcp' => 'Predis\Connection\StreamConnection',
'unix' => 'Predis\Connection\StreamConnection',
'tls' => 'Predis\Connection\StreamConnection',
'redis' => 'Predis\Connection\StreamConnection',
'rediss' => 'Predis\Connection\StreamConnection',
'http' => 'Predis\Connection\WebdisConnection',
);
/**
* Checks if the provided argument represents a valid connection class
* implementing Predis\Connection\NodeConnectionInterface. Optionally,
* callable objects are used for lazy initialization of connection objects.
*
* @param mixed $initializer FQN of a connection class or a callable for lazy initialization.
*
* @throws \InvalidArgumentException
*
* @return mixed
*/
protected function checkInitializer($initializer)
{
if (is_callable($initializer)) {
return $initializer;
}
$class = new \ReflectionClass($initializer);
if (!$class->isSubclassOf('Predis\Connection\NodeConnectionInterface')) {
throw new \InvalidArgumentException(
'A connection initializer must be a valid connection class or a callable object.'
);
}
return $initializer;
}
/**
* {@inheritdoc}
*/
public function define($scheme, $initializer)
{
$this->schemes[$scheme] = $this->checkInitializer($initializer);
}
/**
* {@inheritdoc}
*/
public function undefine($scheme)
{
unset($this->schemes[$scheme]);
}
/**
* {@inheritdoc}
*/
public function create($parameters)
{
if (!$parameters instanceof ParametersInterface) {
$parameters = $this->createParameters($parameters);
}
$scheme = $parameters->scheme;
if (!isset($this->schemes[$scheme])) {
throw new \InvalidArgumentException("Unknown connection scheme: '$scheme'.");
}
$initializer = $this->schemes[$scheme];
if (is_callable($initializer)) {
$connection = call_user_func($initializer, $parameters, $this);
} else {
$connection = new $initializer($parameters);
$this->prepareConnection($connection);
}
if (!$connection instanceof NodeConnectionInterface) {
throw new \UnexpectedValueException(
'Objects returned by connection initializers must implement '.
"'Predis\Connection\NodeConnectionInterface'."
);
}
return $connection;
}
/**
* {@inheritdoc}
*/
public function aggregate(AggregateConnectionInterface $connection, array $parameters)
{
foreach ($parameters as $node) {
$connection->add($node instanceof NodeConnectionInterface ? $node : $this->create($node));
}
}
/**
* Assigns a default set of parameters applied to new connections.
*
* The set of parameters passed to create a new connection have precedence
* over the default values set for the connection factory.
*
* @param array $parameters Set of connection parameters.
*/
public function setDefaultParameters(array $parameters)
{
$this->defaults = $parameters;
}
/**
* Returns the default set of parameters applied to new connections.
*
* @return array
*/
public function getDefaultParameters()
{
return $this->defaults;
}
/**
* Creates a connection parameters instance from the supplied argument.
*
* @param mixed $parameters Original connection parameters.
*
* @return ParametersInterface
*/
protected function createParameters($parameters)
{
if (is_string($parameters)) {
$parameters = Parameters::parse($parameters);
} else {
$parameters = $parameters ?: array();
}
if ($this->defaults) {
$parameters += $this->defaults;
}
return new Parameters($parameters);
}
/**
* Prepares a connection instance after its initialization.
*
* @param NodeConnectionInterface $connection Connection instance.
*/
protected function prepareConnection(NodeConnectionInterface $connection)
{
$parameters = $connection->getParameters();
if (isset($parameters->password)) {
$connection->addConnectCommand(
new RawCommand(array('AUTH', $parameters->password))
);
}
if (isset($parameters->database)) {
$connection->addConnectCommand(
new RawCommand(array('SELECT', $parameters->database))
);
}
}
}

View File

@@ -0,0 +1,52 @@
<?php
/*
* This file is part of the Predis package.
*
* (c) Daniele Alessandri <suppakilla@gmail.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Predis\Connection;
/**
* Interface for classes providing a factory of connections to Redis nodes.
*
* @author Daniele Alessandri <suppakilla@gmail.com>
*/
interface FactoryInterface
{
/**
* Defines or overrides the connection class identified by a scheme prefix.
*
* @param string $scheme Target connection scheme.
* @param mixed $initializer Fully-qualified name of a class or a callable for lazy initialization.
*/
public function define($scheme, $initializer);
/**
* Undefines the connection identified by a scheme prefix.
*
* @param string $scheme Target connection scheme.
*/
public function undefine($scheme);
/**
* Creates a new connection object.
*
* @param mixed $parameters Initialization parameters for the connection.
*
* @return NodeConnectionInterface
*/
public function create($parameters);
/**
* Aggregates single connections into an aggregate connection instance.
*
* @param AggregateConnectionInterface $aggregate Aggregate connection instance.
* @param array $parameters List of parameters for each connection.
*/
public function aggregate(AggregateConnectionInterface $aggregate, array $parameters);
}

View File

@@ -0,0 +1,58 @@
<?php
/*
* This file is part of the Predis package.
*
* (c) Daniele Alessandri <suppakilla@gmail.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Predis\Connection;
use Predis\Command\CommandInterface;
/**
* Defines a connection used to communicate with a single Redis node.
*
* @author Daniele Alessandri <suppakilla@gmail.com>
*/
interface NodeConnectionInterface extends ConnectionInterface
{
/**
* Returns a string representation of the connection.
*
* @return string
*/
public function __toString();
/**
* Returns the underlying resource used to communicate with Redis.
*
* @return mixed
*/
public function getResource();
/**
* Returns the parameters used to initialize the connection.
*
* @return ParametersInterface
*/
public function getParameters();
/**
* Pushes the given command into a queue of commands executed when
* establishing the actual connection to Redis.
*
* @param CommandInterface $command Instance of a Redis command.
*/
public function addConnectCommand(CommandInterface $command);
/**
* Reads a response from the server.
*
* @return mixed
*/
public function read();
}

View File

@@ -0,0 +1,176 @@
<?php
/*
* This file is part of the Predis package.
*
* (c) Daniele Alessandri <suppakilla@gmail.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Predis\Connection;
/**
* Container for connection parameters used to initialize connections to Redis.
*
* {@inheritdoc}
*
* @author Daniele Alessandri <suppakilla@gmail.com>
*/
class Parameters implements ParametersInterface
{
private $parameters;
private static $defaults = array(
'scheme' => 'tcp',
'host' => '127.0.0.1',
'port' => 6379,
);
/**
* @param array $parameters Named array of connection parameters.
*/
public function __construct(array $parameters = array())
{
$this->parameters = $this->filter($parameters) + $this->getDefaults();
}
/**
* Returns some default parameters with their values.
*
* @return array
*/
protected function getDefaults()
{
return self::$defaults;
}
/**
* Creates a new instance by supplying the initial parameters either in the
* form of an URI string or a named array.
*
* @param array|string $parameters Set of connection parameters.
*
* @return Parameters
*/
public static function create($parameters)
{
if (is_string($parameters)) {
$parameters = static::parse($parameters);
}
return new static($parameters ?: array());
}
/**
* Parses an URI string returning an array of connection parameters.
*
* When using the "redis" and "rediss" schemes the URI is parsed according
* to the rules defined by the provisional registration documents approved
* by IANA. If the URI has a password in its "user-information" part or a
* database number in the "path" part these values override the values of
* "password" and "database" if they are present in the "query" part.
*
* @link http://www.iana.org/assignments/uri-schemes/prov/redis
* @link http://www.iana.org/assignments/uri-schemes/prov/rediss
*
* @param string $uri URI string.
*
* @throws \InvalidArgumentException
*
* @return array
*/
public static function parse($uri)
{
if (stripos($uri, 'unix://') === 0) {
// parse_url() can parse unix:/path/to/sock so we do not need the
// unix:///path/to/sock hack, we will support it anyway until 2.0.
$uri = str_ireplace('unix://', 'unix:', $uri);
}
if (!$parsed = parse_url($uri)) {
throw new \InvalidArgumentException("Invalid parameters URI: $uri");
}
if (
isset($parsed['host'])
&& false !== strpos($parsed['host'], '[')
&& false !== strpos($parsed['host'], ']')
) {
$parsed['host'] = substr($parsed['host'], 1, -1);
}
if (isset($parsed['query'])) {
parse_str($parsed['query'], $queryarray);
unset($parsed['query']);
$parsed = array_merge($parsed, $queryarray);
}
if (stripos($uri, 'redis') === 0) {
if (isset($parsed['pass'])) {
$parsed['password'] = $parsed['pass'];
unset($parsed['pass']);
}
if (isset($parsed['path']) && preg_match('/^\/(\d+)(\/.*)?/', $parsed['path'], $path)) {
$parsed['database'] = $path[1];
if (isset($path[2])) {
$parsed['path'] = $path[2];
} else {
unset($parsed['path']);
}
}
}
return $parsed;
}
/**
* Validates and converts each value of the connection parameters array.
*
* @param array $parameters Connection parameters.
*
* @return array
*/
protected function filter(array $parameters)
{
return $parameters ?: array();
}
/**
* {@inheritdoc}
*/
public function __get($parameter)
{
if (isset($this->parameters[$parameter])) {
return $this->parameters[$parameter];
}
}
/**
* {@inheritdoc}
*/
public function __isset($parameter)
{
return isset($this->parameters[$parameter]);
}
/**
* {@inheritdoc}
*/
public function toArray()
{
return $this->parameters;
}
/**
* {@inheritdoc}
*/
public function __sleep()
{
return array('parameters');
}
}

View File

@@ -0,0 +1,62 @@
<?php
/*
* This file is part of the Predis package.
*
* (c) Daniele Alessandri <suppakilla@gmail.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Predis\Connection;
/**
* Interface defining a container for connection parameters.
*
* The actual list of connection parameters depends on the features supported by
* each connection backend class (please refer to their specific documentation),
* but the most common parameters used through the library are:
*
* @property-read string scheme Connection scheme, such as 'tcp' or 'unix'.
* @property-read string host IP address or hostname of Redis.
* @property-read int port TCP port on which Redis is listening to.
* @property-read string path Path of a UNIX domain socket file.
* @property-read string alias Alias for the connection.
* @property-read float timeout Timeout for the connect() operation.
* @property-read float read_write_timeout Timeout for read() and write() operations.
* @property-read bool async_connect Performs the connect() operation asynchronously.
* @property-read bool tcp_nodelay Toggles the Nagle's algorithm for coalescing.
* @property-read bool persistent Leaves the connection open after a GC collection.
* @property-read string password Password to access Redis (see the AUTH command).
* @property-read string database Database index (see the SELECT command).
*
* @author Daniele Alessandri <suppakilla@gmail.com>
*/
interface ParametersInterface
{
/**
* Checks if the specified parameters is set.
*
* @param string $parameter Name of the parameter.
*
* @return bool
*/
public function __isset($parameter);
/**
* Returns the value of the specified parameter.
*
* @param string $parameter Name of the parameter.
*
* @return mixed|null
*/
public function __get($parameter);
/**
* Returns an array representation of the connection parameters.
*
* @return array
*/
public function toArray();
}

View File

@@ -0,0 +1,418 @@
<?php
/*
* This file is part of the Predis package.
*
* (c) Daniele Alessandri <suppakilla@gmail.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Predis\Connection;
use Predis\Command\CommandInterface;
use Predis\NotSupportedException;
use Predis\Response\Error as ErrorResponse;
use Predis\Response\ErrorInterface as ErrorResponseInterface;
use Predis\Response\Status as StatusResponse;
/**
* This class provides the implementation of a Predis connection that uses the
* PHP socket extension for network communication and wraps the phpiredis C
* extension (PHP bindings for hiredis) to parse the Redis protocol.
*
* This class is intended to provide an optional low-overhead alternative for
* processing responses from Redis compared to the standard pure-PHP classes.
* Differences in speed when dealing with short inline responses are practically
* nonexistent, the actual speed boost is for big multibulk responses when this
* protocol processor can parse and return responses very fast.
*
* For instructions on how to build and install the phpiredis extension, please
* consult the repository of the project.
*
* The connection parameters supported by this class are:
*
* - scheme: it can be either 'redis', 'tcp' or 'unix'.
* - host: hostname or IP address of the server.
* - port: TCP port of the server.
* - path: path of a UNIX domain socket when scheme is 'unix'.
* - timeout: timeout to perform the connection (default is 5 seconds).
* - read_write_timeout: timeout of read / write operations.
*
* @link http://github.com/nrk/phpiredis
*
* @author Daniele Alessandri <suppakilla@gmail.com>
*/
class PhpiredisSocketConnection extends AbstractConnection
{
private $reader;
/**
* {@inheritdoc}
*/
public function __construct(ParametersInterface $parameters)
{
$this->assertExtensions();
parent::__construct($parameters);
$this->reader = $this->createReader();
}
/**
* Disconnects from the server and destroys the underlying resource and the
* protocol reader resource when PHP's garbage collector kicks in.
*/
public function __destruct()
{
phpiredis_reader_destroy($this->reader);
parent::__destruct();
}
/**
* Checks if the socket and phpiredis extensions are loaded in PHP.
*/
protected function assertExtensions()
{
if (!extension_loaded('sockets')) {
throw new NotSupportedException(
'The "sockets" extension is required by this connection backend.'
);
}
if (!extension_loaded('phpiredis')) {
throw new NotSupportedException(
'The "phpiredis" extension is required by this connection backend.'
);
}
}
/**
* {@inheritdoc}
*/
protected function assertParameters(ParametersInterface $parameters)
{
switch ($parameters->scheme) {
case 'tcp':
case 'redis':
case 'unix':
break;
default:
throw new \InvalidArgumentException("Invalid scheme: '$parameters->scheme'.");
}
if (isset($parameters->persistent)) {
throw new NotSupportedException(
'Persistent connections are not supported by this connection backend.'
);
}
return $parameters;
}
/**
* Creates a new instance of the protocol reader resource.
*
* @return resource
*/
private function createReader()
{
$reader = phpiredis_reader_create();
phpiredis_reader_set_status_handler($reader, $this->getStatusHandler());
phpiredis_reader_set_error_handler($reader, $this->getErrorHandler());
return $reader;
}
/**
* Returns the underlying protocol reader resource.
*
* @return resource
*/
protected function getReader()
{
return $this->reader;
}
/**
* Returns the handler used by the protocol reader for inline responses.
*
* @return \Closure
*/
protected function getStatusHandler()
{
static $statusHandler;
if (!$statusHandler) {
$statusHandler = function ($payload) {
return StatusResponse::get($payload);
};
}
return $statusHandler;
}
/**
* Returns the handler used by the protocol reader for error responses.
*
* @return \Closure
*/
protected function getErrorHandler()
{
static $errorHandler;
if (!$errorHandler) {
$errorHandler = function ($errorMessage) {
return new ErrorResponse($errorMessage);
};
}
return $errorHandler;
}
/**
* Helper method used to throw exceptions on socket errors.
*/
private function emitSocketError()
{
$errno = socket_last_error();
$errstr = socket_strerror($errno);
$this->disconnect();
$this->onConnectionError(trim($errstr), $errno);
}
/**
* Gets the address of an host from connection parameters.
*
* @param ParametersInterface $parameters Parameters used to initialize the connection.
*
* @return string
*/
protected static function getAddress(ParametersInterface $parameters)
{
if (filter_var($host = $parameters->host, FILTER_VALIDATE_IP)) {
return $host;
}
if ($host === $address = gethostbyname($host)) {
return false;
}
return $address;
}
/**
* {@inheritdoc}
*/
protected function createResource()
{
$parameters = $this->parameters;
if ($parameters->scheme === 'unix') {
$address = $parameters->path;
$domain = AF_UNIX;
$protocol = 0;
} else {
if (false === $address = self::getAddress($parameters)) {
$this->onConnectionError("Cannot resolve the address of '$parameters->host'.");
}
$domain = filter_var($address, FILTER_VALIDATE_IP, FILTER_FLAG_IPV6) ? AF_INET6 : AF_INET;
$protocol = SOL_TCP;
}
$socket = @socket_create($domain, SOCK_STREAM, $protocol);
if (!is_resource($socket)) {
$this->emitSocketError();
}
$this->setSocketOptions($socket, $parameters);
$this->connectWithTimeout($socket, $address, $parameters);
return $socket;
}
/**
* Sets options on the socket resource from the connection parameters.
*
* @param resource $socket Socket resource.
* @param ParametersInterface $parameters Parameters used to initialize the connection.
*/
private function setSocketOptions($socket, ParametersInterface $parameters)
{
if ($parameters->scheme !== 'unix') {
if (!socket_set_option($socket, SOL_TCP, TCP_NODELAY, 1)) {
$this->emitSocketError();
}
if (!socket_set_option($socket, SOL_SOCKET, SO_REUSEADDR, 1)) {
$this->emitSocketError();
}
}
if (isset($parameters->read_write_timeout)) {
$rwtimeout = (float) $parameters->read_write_timeout;
$timeoutSec = floor($rwtimeout);
$timeoutUsec = ($rwtimeout - $timeoutSec) * 1000000;
$timeout = array(
'sec' => $timeoutSec,
'usec' => $timeoutUsec,
);
if (!socket_set_option($socket, SOL_SOCKET, SO_SNDTIMEO, $timeout)) {
$this->emitSocketError();
}
if (!socket_set_option($socket, SOL_SOCKET, SO_RCVTIMEO, $timeout)) {
$this->emitSocketError();
}
}
}
/**
* Opens the actual connection to the server with a timeout.
*
* @param resource $socket Socket resource.
* @param string $address IP address (DNS-resolved from hostname)
* @param ParametersInterface $parameters Parameters used to initialize the connection.
*
* @return string
*/
private function connectWithTimeout($socket, $address, ParametersInterface $parameters)
{
socket_set_nonblock($socket);
if (@socket_connect($socket, $address, (int) $parameters->port) === false) {
$error = socket_last_error();
if ($error != SOCKET_EINPROGRESS && $error != SOCKET_EALREADY) {
$this->emitSocketError();
}
}
socket_set_block($socket);
$null = null;
$selectable = array($socket);
$timeout = (isset($parameters->timeout) ? (float) $parameters->timeout : 5.0);
$timeoutSecs = floor($timeout);
$timeoutUSecs = ($timeout - $timeoutSecs) * 1000000;
$selected = socket_select($selectable, $selectable, $null, $timeoutSecs, $timeoutUSecs);
if ($selected === 2) {
$this->onConnectionError('Connection refused.', SOCKET_ECONNREFUSED);
}
if ($selected === 0) {
$this->onConnectionError('Connection timed out.', SOCKET_ETIMEDOUT);
}
if ($selected === false) {
$this->emitSocketError();
}
}
/**
* {@inheritdoc}
*/
public function connect()
{
if (parent::connect() && $this->initCommands) {
foreach ($this->initCommands as $command) {
$response = $this->executeCommand($command);
if ($response instanceof ErrorResponseInterface) {
$this->onConnectionError("`{$command->getId()}` failed: $response", 0);
}
}
}
}
/**
* {@inheritdoc}
*/
public function disconnect()
{
if ($this->isConnected()) {
socket_close($this->getResource());
parent::disconnect();
}
}
/**
* {@inheritdoc}
*/
protected function write($buffer)
{
$socket = $this->getResource();
while (($length = strlen($buffer)) > 0) {
$written = socket_write($socket, $buffer, $length);
if ($length === $written) {
return;
}
if ($written === false) {
$this->onConnectionError('Error while writing bytes to the server.');
}
$buffer = substr($buffer, $written);
}
}
/**
* {@inheritdoc}
*/
public function read()
{
$socket = $this->getResource();
$reader = $this->reader;
while (PHPIREDIS_READER_STATE_INCOMPLETE === $state = phpiredis_reader_get_state($reader)) {
if (@socket_recv($socket, $buffer, 4096, 0) === false || $buffer === '' || $buffer === null) {
$this->emitSocketError();
}
phpiredis_reader_feed($reader, $buffer);
}
if ($state === PHPIREDIS_READER_STATE_COMPLETE) {
return phpiredis_reader_get_reply($reader);
} else {
$this->onProtocolError(phpiredis_reader_get_error($reader));
return;
}
}
/**
* {@inheritdoc}
*/
public function writeRequest(CommandInterface $command)
{
$arguments = $command->getArguments();
array_unshift($arguments, $command->getId());
$this->write(phpiredis_format_command($arguments));
}
/**
* {@inheritdoc}
*/
public function __wakeup()
{
$this->assertExtensions();
$this->reader = $this->createReader();
}
}

View File

@@ -0,0 +1,238 @@
<?php
/*
* This file is part of the Predis package.
*
* (c) Daniele Alessandri <suppakilla@gmail.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Predis\Connection;
use Predis\Command\CommandInterface;
use Predis\NotSupportedException;
use Predis\Response\Error as ErrorResponse;
use Predis\Response\Status as StatusResponse;
/**
* This class provides the implementation of a Predis connection that uses PHP's
* streams for network communication and wraps the phpiredis C extension (PHP
* bindings for hiredis) to parse and serialize the Redis protocol.
*
* This class is intended to provide an optional low-overhead alternative for
* processing responses from Redis compared to the standard pure-PHP classes.
* Differences in speed when dealing with short inline responses are practically
* nonexistent, the actual speed boost is for big multibulk responses when this
* protocol processor can parse and return responses very fast.
*
* For instructions on how to build and install the phpiredis extension, please
* consult the repository of the project.
*
* The connection parameters supported by this class are:
*
* - scheme: it can be either 'redis', 'tcp' or 'unix'.
* - host: hostname or IP address of the server.
* - port: TCP port of the server.
* - path: path of a UNIX domain socket when scheme is 'unix'.
* - timeout: timeout to perform the connection.
* - read_write_timeout: timeout of read / write operations.
* - async_connect: performs the connection asynchronously.
* - tcp_nodelay: enables or disables Nagle's algorithm for coalescing.
* - persistent: the connection is left intact after a GC collection.
*
* @link https://github.com/nrk/phpiredis
*
* @author Daniele Alessandri <suppakilla@gmail.com>
*/
class PhpiredisStreamConnection extends StreamConnection
{
private $reader;
/**
* {@inheritdoc}
*/
public function __construct(ParametersInterface $parameters)
{
$this->assertExtensions();
parent::__construct($parameters);
$this->reader = $this->createReader();
}
/**
* {@inheritdoc}
*/
public function __destruct()
{
phpiredis_reader_destroy($this->reader);
parent::__destruct();
}
/**
* Checks if the phpiredis extension is loaded in PHP.
*/
private function assertExtensions()
{
if (!extension_loaded('phpiredis')) {
throw new NotSupportedException(
'The "phpiredis" extension is required by this connection backend.'
);
}
}
/**
* {@inheritdoc}
*/
protected function assertSslSupport(ParametersInterface $parameters)
{
throw new \InvalidArgumentException('SSL encryption is not supported by this connection backend.');
}
/**
* {@inheritdoc}
*/
protected function createStreamSocket(ParametersInterface $parameters, $address, $flags, $context = null)
{
$socket = null;
$timeout = (isset($parameters->timeout) ? (float) $parameters->timeout : 5.0);
$resource = @stream_socket_client($address, $errno, $errstr, $timeout, $flags);
if (!$resource) {
$this->onConnectionError(trim($errstr), $errno);
}
if (isset($parameters->read_write_timeout) && function_exists('socket_import_stream')) {
$rwtimeout = (float) $parameters->read_write_timeout;
$rwtimeout = $rwtimeout > 0 ? $rwtimeout : -1;
$timeout = array(
'sec' => $timeoutSeconds = floor($rwtimeout),
'usec' => ($rwtimeout - $timeoutSeconds) * 1000000,
);
$socket = $socket ?: socket_import_stream($resource);
@socket_set_option($socket, SOL_SOCKET, SO_SNDTIMEO, $timeout);
@socket_set_option($socket, SOL_SOCKET, SO_RCVTIMEO, $timeout);
}
if (isset($parameters->tcp_nodelay) && function_exists('socket_import_stream')) {
$socket = $socket ?: socket_import_stream($resource);
socket_set_option($socket, SOL_TCP, TCP_NODELAY, (int) $parameters->tcp_nodelay);
}
return $resource;
}
/**
* Creates a new instance of the protocol reader resource.
*
* @return resource
*/
private function createReader()
{
$reader = phpiredis_reader_create();
phpiredis_reader_set_status_handler($reader, $this->getStatusHandler());
phpiredis_reader_set_error_handler($reader, $this->getErrorHandler());
return $reader;
}
/**
* Returns the underlying protocol reader resource.
*
* @return resource
*/
protected function getReader()
{
return $this->reader;
}
/**
* Returns the handler used by the protocol reader for inline responses.
*
* @return \Closure
*/
protected function getStatusHandler()
{
static $statusHandler;
if (!$statusHandler) {
$statusHandler = function ($payload) {
return StatusResponse::get($payload);
};
}
return $statusHandler;
}
/**
* Returns the handler used by the protocol reader for error responses.
*
* @return \Closure
*/
protected function getErrorHandler()
{
static $errorHandler;
if (!$errorHandler) {
$errorHandler = function ($errorMessage) {
return new ErrorResponse($errorMessage);
};
}
return $errorHandler;
}
/**
* {@inheritdoc}
*/
public function read()
{
$socket = $this->getResource();
$reader = $this->reader;
while (PHPIREDIS_READER_STATE_INCOMPLETE === $state = phpiredis_reader_get_state($reader)) {
$buffer = stream_socket_recvfrom($socket, 4096);
if ($buffer === false || $buffer === '') {
$this->onConnectionError('Error while reading bytes from the server.');
}
phpiredis_reader_feed($reader, $buffer);
}
if ($state === PHPIREDIS_READER_STATE_COMPLETE) {
return phpiredis_reader_get_reply($reader);
} else {
$this->onProtocolError(phpiredis_reader_get_error($reader));
return;
}
}
/**
* {@inheritdoc}
*/
public function writeRequest(CommandInterface $command)
{
$arguments = $command->getArguments();
array_unshift($arguments, $command->getId());
$this->write(phpiredis_format_command($arguments));
}
/**
* {@inheritdoc}
*/
public function __wakeup()
{
$this->assertExtensions();
$this->reader = $this->createReader();
}
}

View File

@@ -0,0 +1,396 @@
<?php
/*
* This file is part of the Predis package.
*
* (c) Daniele Alessandri <suppakilla@gmail.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Predis\Connection;
use Predis\Command\CommandInterface;
use Predis\Response\Error as ErrorResponse;
use Predis\Response\ErrorInterface as ErrorResponseInterface;
use Predis\Response\Status as StatusResponse;
/**
* Standard connection to Redis servers implemented on top of PHP's streams.
* The connection parameters supported by this class are:.
*
* - scheme: it can be either 'redis', 'tcp', 'rediss', 'tls' or 'unix'.
* - host: hostname or IP address of the server.
* - port: TCP port of the server.
* - path: path of a UNIX domain socket when scheme is 'unix'.
* - timeout: timeout to perform the connection (default is 5 seconds).
* - read_write_timeout: timeout of read / write operations.
* - async_connect: performs the connection asynchronously.
* - tcp_nodelay: enables or disables Nagle's algorithm for coalescing.
* - persistent: the connection is left intact after a GC collection.
* - ssl: context options array (see http://php.net/manual/en/context.ssl.php)
*
* @author Daniele Alessandri <suppakilla@gmail.com>
*/
class StreamConnection extends AbstractConnection
{
/**
* Disconnects from the server and destroys the underlying resource when the
* garbage collector kicks in only if the connection has not been marked as
* persistent.
*/
public function __destruct()
{
if (isset($this->parameters->persistent) && $this->parameters->persistent) {
return;
}
$this->disconnect();
}
/**
* {@inheritdoc}
*/
protected function assertParameters(ParametersInterface $parameters)
{
switch ($parameters->scheme) {
case 'tcp':
case 'redis':
case 'unix':
break;
case 'tls':
case 'rediss':
$this->assertSslSupport($parameters);
break;
default:
throw new \InvalidArgumentException("Invalid scheme: '$parameters->scheme'.");
}
return $parameters;
}
/**
* Checks needed conditions for SSL-encrypted connections.
*
* @param ParametersInterface $parameters Initialization parameters for the connection.
*
* @throws \InvalidArgumentException
*/
protected function assertSslSupport(ParametersInterface $parameters)
{
if (
filter_var($parameters->persistent, FILTER_VALIDATE_BOOLEAN) &&
version_compare(PHP_VERSION, '7.0.0beta') < 0
) {
throw new \InvalidArgumentException('Persistent SSL connections require PHP >= 7.0.0.');
}
}
/**
* {@inheritdoc}
*/
protected function createResource()
{
switch ($this->parameters->scheme) {
case 'tcp':
case 'redis':
return $this->tcpStreamInitializer($this->parameters);
case 'unix':
return $this->unixStreamInitializer($this->parameters);
case 'tls':
case 'rediss':
return $this->tlsStreamInitializer($this->parameters);
default:
throw new \InvalidArgumentException("Invalid scheme: '{$this->parameters->scheme}'.");
}
}
/**
* Creates a connected stream socket resource.
*
* @param ParametersInterface $parameters Connection parameters.
* @param string $address Address for stream_socket_client().
* @param int $flags Flags for stream_socket_client().
*
* @return resource
*/
protected function createStreamSocket(ParametersInterface $parameters, $address, $flags)
{
$timeout = (isset($parameters->timeout) ? (float) $parameters->timeout : 5.0);
if (!$resource = @stream_socket_client($address, $errno, $errstr, $timeout, $flags)) {
$this->onConnectionError(trim($errstr), $errno);
}
if (isset($parameters->read_write_timeout)) {
$rwtimeout = (float) $parameters->read_write_timeout;
$rwtimeout = $rwtimeout > 0 ? $rwtimeout : -1;
$timeoutSeconds = floor($rwtimeout);
$timeoutUSeconds = ($rwtimeout - $timeoutSeconds) * 1000000;
stream_set_timeout($resource, $timeoutSeconds, $timeoutUSeconds);
}
if (isset($parameters->tcp_nodelay) && function_exists('socket_import_stream')) {
$socket = socket_import_stream($resource);
socket_set_option($socket, SOL_TCP, TCP_NODELAY, (int) $parameters->tcp_nodelay);
}
return $resource;
}
/**
* Initializes a TCP stream resource.
*
* @param ParametersInterface $parameters Initialization parameters for the connection.
*
* @return resource
*/
protected function tcpStreamInitializer(ParametersInterface $parameters)
{
if (!filter_var($parameters->host, FILTER_VALIDATE_IP, FILTER_FLAG_IPV6)) {
$address = "tcp://$parameters->host:$parameters->port";
} else {
$address = "tcp://[$parameters->host]:$parameters->port";
}
$flags = STREAM_CLIENT_CONNECT;
if (isset($parameters->async_connect) && $parameters->async_connect) {
$flags |= STREAM_CLIENT_ASYNC_CONNECT;
}
if (isset($parameters->persistent)) {
if (false !== $persistent = filter_var($parameters->persistent, FILTER_VALIDATE_BOOLEAN, FILTER_NULL_ON_FAILURE)) {
$flags |= STREAM_CLIENT_PERSISTENT;
if ($persistent === null) {
$address = "{$address}/{$parameters->persistent}";
}
}
}
$resource = $this->createStreamSocket($parameters, $address, $flags);
return $resource;
}
/**
* Initializes a UNIX stream resource.
*
* @param ParametersInterface $parameters Initialization parameters for the connection.
*
* @return resource
*/
protected function unixStreamInitializer(ParametersInterface $parameters)
{
if (!isset($parameters->path)) {
throw new \InvalidArgumentException('Missing UNIX domain socket path.');
}
$flags = STREAM_CLIENT_CONNECT;
if (isset($parameters->persistent)) {
if (false !== $persistent = filter_var($parameters->persistent, FILTER_VALIDATE_BOOLEAN, FILTER_NULL_ON_FAILURE)) {
$flags |= STREAM_CLIENT_PERSISTENT;
if ($persistent === null) {
throw new \InvalidArgumentException(
'Persistent connection IDs are not supported when using UNIX domain sockets.'
);
}
}
}
$resource = $this->createStreamSocket($parameters, "unix://{$parameters->path}", $flags);
return $resource;
}
/**
* Initializes a SSL-encrypted TCP stream resource.
*
* @param ParametersInterface $parameters Initialization parameters for the connection.
*
* @return resource
*/
protected function tlsStreamInitializer(ParametersInterface $parameters)
{
$resource = $this->tcpStreamInitializer($parameters);
$metadata = stream_get_meta_data($resource);
// Detect if crypto mode is already enabled for this stream (PHP >= 7.0.0).
if (isset($metadata['crypto'])) {
return $resource;
}
if (is_array($parameters->ssl)) {
$options = $parameters->ssl;
} else {
$options = array();
}
if (!isset($options['crypto_type'])) {
$options['crypto_type'] = STREAM_CRYPTO_METHOD_TLS_CLIENT;
}
if (!stream_context_set_option($resource, array('ssl' => $options))) {
$this->onConnectionError('Error while setting SSL context options');
}
if (!stream_socket_enable_crypto($resource, true, $options['crypto_type'])) {
$this->onConnectionError('Error while switching to encrypted communication');
}
return $resource;
}
/**
* {@inheritdoc}
*/
public function connect()
{
if (parent::connect() && $this->initCommands) {
foreach ($this->initCommands as $command) {
$response = $this->executeCommand($command);
if ($response instanceof ErrorResponseInterface) {
$this->onConnectionError("`{$command->getId()}` failed: $response", 0);
}
}
}
}
/**
* {@inheritdoc}
*/
public function disconnect()
{
if ($this->isConnected()) {
fclose($this->getResource());
parent::disconnect();
}
}
/**
* Performs a write operation over the stream of the buffer containing a
* command serialized with the Redis wire protocol.
*
* @param string $buffer Representation of a command in the Redis wire protocol.
*/
protected function write($buffer)
{
$socket = $this->getResource();
while (($length = strlen($buffer)) > 0) {
$written = @fwrite($socket, $buffer);
if ($length === $written) {
return;
}
if ($written === false || $written === 0) {
$this->onConnectionError('Error while writing bytes to the server.');
}
$buffer = substr($buffer, $written);
}
}
/**
* {@inheritdoc}
*/
public function read()
{
$socket = $this->getResource();
$chunk = fgets($socket);
if ($chunk === false || $chunk === '') {
$this->onConnectionError('Error while reading line from the server.');
}
$prefix = $chunk[0];
$payload = substr($chunk, 1, -2);
switch ($prefix) {
case '+':
return StatusResponse::get($payload);
case '$':
$size = (int) $payload;
if ($size === -1) {
return;
}
$bulkData = '';
$bytesLeft = ($size += 2);
do {
$chunk = fread($socket, min($bytesLeft, 4096));
if ($chunk === false || $chunk === '') {
$this->onConnectionError('Error while reading bytes from the server.');
}
$bulkData .= $chunk;
$bytesLeft = $size - strlen($bulkData);
} while ($bytesLeft > 0);
return substr($bulkData, 0, -2);
case '*':
$count = (int) $payload;
if ($count === -1) {
return;
}
$multibulk = array();
for ($i = 0; $i < $count; ++$i) {
$multibulk[$i] = $this->read();
}
return $multibulk;
case ':':
$integer = (int) $payload;
return $integer == $payload ? $integer : $payload;
case '-':
return new ErrorResponse($payload);
default:
$this->onProtocolError("Unknown response prefix: '$prefix'.");
return;
}
}
/**
* {@inheritdoc}
*/
public function writeRequest(CommandInterface $command)
{
$commandID = $command->getId();
$arguments = $command->getArguments();
$cmdlen = strlen($commandID);
$reqlen = count($arguments) + 1;
$buffer = "*{$reqlen}\r\n\${$cmdlen}\r\n{$commandID}\r\n";
foreach ($arguments as $argument) {
$arglen = strlen($argument);
$buffer .= "\${$arglen}\r\n{$argument}\r\n";
}
$this->write($buffer);
}
}

View File

@@ -0,0 +1,366 @@
<?php
/*
* This file is part of the Predis package.
*
* (c) Daniele Alessandri <suppakilla@gmail.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Predis\Connection;
use Predis\Command\CommandInterface;
use Predis\NotSupportedException;
use Predis\Protocol\ProtocolException;
use Predis\Response\Error as ErrorResponse;
use Predis\Response\Status as StatusResponse;
/**
* This class implements a Predis connection that actually talks with Webdis
* instead of connecting directly to Redis. It relies on the cURL extension to
* communicate with the web server and the phpiredis extension to parse the
* protocol for responses returned in the http response bodies.
*
* Some features are not yet available or they simply cannot be implemented:
* - Pipelining commands.
* - Publish / Subscribe.
* - MULTI / EXEC transactions (not yet supported by Webdis).
*
* The connection parameters supported by this class are:
*
* - scheme: must be 'http'.
* - host: hostname or IP address of the server.
* - port: TCP port of the server.
* - timeout: timeout to perform the connection (default is 5 seconds).
* - user: username for authentication.
* - pass: password for authentication.
*
* @link http://webd.is
* @link http://github.com/nicolasff/webdis
* @link http://github.com/seppo0010/phpiredis
*
* @author Daniele Alessandri <suppakilla@gmail.com>
*/
class WebdisConnection implements NodeConnectionInterface
{
private $parameters;
private $resource;
private $reader;
/**
* @param ParametersInterface $parameters Initialization parameters for the connection.
*
* @throws \InvalidArgumentException
*/
public function __construct(ParametersInterface $parameters)
{
$this->assertExtensions();
if ($parameters->scheme !== 'http') {
throw new \InvalidArgumentException("Invalid scheme: '{$parameters->scheme}'.");
}
$this->parameters = $parameters;
$this->resource = $this->createCurl();
$this->reader = $this->createReader();
}
/**
* Frees the underlying cURL and protocol reader resources when the garbage
* collector kicks in.
*/
public function __destruct()
{
curl_close($this->resource);
phpiredis_reader_destroy($this->reader);
}
/**
* Helper method used to throw on unsupported methods.
*
* @param string $method Name of the unsupported method.
*
* @throws NotSupportedException
*/
private function throwNotSupportedException($method)
{
$class = __CLASS__;
throw new NotSupportedException("The method $class::$method() is not supported.");
}
/**
* Checks if the cURL and phpiredis extensions are loaded in PHP.
*/
private function assertExtensions()
{
if (!extension_loaded('curl')) {
throw new NotSupportedException(
'The "curl" extension is required by this connection backend.'
);
}
if (!extension_loaded('phpiredis')) {
throw new NotSupportedException(
'The "phpiredis" extension is required by this connection backend.'
);
}
}
/**
* Initializes cURL.
*
* @return resource
*/
private function createCurl()
{
$parameters = $this->getParameters();
$timeout = (isset($parameters->timeout) ? (float) $parameters->timeout : 5.0) * 1000;
if (filter_var($host = $parameters->host, FILTER_VALIDATE_IP)) {
$host = "[$host]";
}
$options = array(
CURLOPT_FAILONERROR => true,
CURLOPT_CONNECTTIMEOUT_MS => $timeout,
CURLOPT_URL => "$parameters->scheme://$host:$parameters->port",
CURLOPT_HTTP_VERSION => CURL_HTTP_VERSION_1_1,
CURLOPT_POST => true,
CURLOPT_WRITEFUNCTION => array($this, 'feedReader'),
);
if (isset($parameters->user, $parameters->pass)) {
$options[CURLOPT_USERPWD] = "{$parameters->user}:{$parameters->pass}";
}
curl_setopt_array($resource = curl_init(), $options);
return $resource;
}
/**
* Initializes the phpiredis protocol reader.
*
* @return resource
*/
private function createReader()
{
$reader = phpiredis_reader_create();
phpiredis_reader_set_status_handler($reader, $this->getStatusHandler());
phpiredis_reader_set_error_handler($reader, $this->getErrorHandler());
return $reader;
}
/**
* Returns the handler used by the protocol reader for inline responses.
*
* @return \Closure
*/
protected function getStatusHandler()
{
static $statusHandler;
if (!$statusHandler) {
$statusHandler = function ($payload) {
return StatusResponse::get($payload);
};
}
return $statusHandler;
}
/**
* Returns the handler used by the protocol reader for error responses.
*
* @return \Closure
*/
protected function getErrorHandler()
{
static $errorHandler;
if (!$errorHandler) {
$errorHandler = function ($errorMessage) {
return new ErrorResponse($errorMessage);
};
}
return $errorHandler;
}
/**
* Feeds the phpredis reader resource with the data read from the network.
*
* @param resource $resource Reader resource.
* @param string $buffer Buffer of data read from a connection.
*
* @return int
*/
protected function feedReader($resource, $buffer)
{
phpiredis_reader_feed($this->reader, $buffer);
return strlen($buffer);
}
/**
* {@inheritdoc}
*/
public function connect()
{
// NOOP
}
/**
* {@inheritdoc}
*/
public function disconnect()
{
// NOOP
}
/**
* {@inheritdoc}
*/
public function isConnected()
{
return true;
}
/**
* Checks if the specified command is supported by this connection class.
*
* @param CommandInterface $command Command instance.
*
* @throws NotSupportedException
*
* @return string
*/
protected function getCommandId(CommandInterface $command)
{
switch ($commandID = $command->getId()) {
case 'AUTH':
case 'SELECT':
case 'MULTI':
case 'EXEC':
case 'WATCH':
case 'UNWATCH':
case 'DISCARD':
case 'MONITOR':
throw new NotSupportedException("Command '$commandID' is not allowed by Webdis.");
default:
return $commandID;
}
}
/**
* {@inheritdoc}
*/
public function writeRequest(CommandInterface $command)
{
$this->throwNotSupportedException(__FUNCTION__);
}
/**
* {@inheritdoc}
*/
public function readResponse(CommandInterface $command)
{
$this->throwNotSupportedException(__FUNCTION__);
}
/**
* {@inheritdoc}
*/
public function executeCommand(CommandInterface $command)
{
$resource = $this->resource;
$commandId = $this->getCommandId($command);
if ($arguments = $command->getArguments()) {
$arguments = implode('/', array_map('urlencode', $arguments));
$serializedCommand = "$commandId/$arguments.raw";
} else {
$serializedCommand = "$commandId.raw";
}
curl_setopt($resource, CURLOPT_POSTFIELDS, $serializedCommand);
if (curl_exec($resource) === false) {
$error = curl_error($resource);
$errno = curl_errno($resource);
throw new ConnectionException($this, trim($error), $errno);
}
if (phpiredis_reader_get_state($this->reader) !== PHPIREDIS_READER_STATE_COMPLETE) {
throw new ProtocolException($this, phpiredis_reader_get_error($this->reader));
}
return phpiredis_reader_get_reply($this->reader);
}
/**
* {@inheritdoc}
*/
public function getResource()
{
return $this->resource;
}
/**
* {@inheritdoc}
*/
public function getParameters()
{
return $this->parameters;
}
/**
* {@inheritdoc}
*/
public function addConnectCommand(CommandInterface $command)
{
$this->throwNotSupportedException(__FUNCTION__);
}
/**
* {@inheritdoc}
*/
public function read()
{
$this->throwNotSupportedException(__FUNCTION__);
}
/**
* {@inheritdoc}
*/
public function __toString()
{
return "{$this->parameters->host}:{$this->parameters->port}";
}
/**
* {@inheritdoc}
*/
public function __sleep()
{
return array('parameters');
}
/**
* {@inheritdoc}
*/
public function __wakeup()
{
$this->assertExtensions();
$this->resource = $this->createCurl();
$this->reader = $this->createReader();
}
}