Skip to content

Commit

Permalink
Checking connection state of the socket
Browse files Browse the repository at this point in the history
  • Loading branch information
edefimov committed May 23, 2015
1 parent 04e08eb commit 0b0ed3d
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 13 deletions.
132 changes: 121 additions & 11 deletions src/Socket/AbstractSocket.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,40 @@ abstract class AbstractSocket implements SocketInterface
*/
const SOCKET_BUFFER_SIZE = 8192;

/**
* Amount of attempts to set data
*/
const SEND_ATTEMPTS = 10;

/**
* Delay for select operation, microseconds
*/
const SELECT_DELAY = 25000;

/**
* Disconnected state
*/
const STATE_DISCONNECTED = 0;

/**
* Connected state
*/
const STATE_CONNECTED = 1;

/**
* This socket resource
*
* @var resource
*/
private $resource;

/**
* Socket state
*
* @var int
*/
private $state;

/**
* Destructor
*/
Expand Down Expand Up @@ -60,6 +87,7 @@ public function open($address, $context = null)
/** {@inheritdoc} */
public function close()
{
$this->state = self::STATE_DISCONNECTED;
if ($this->resource) {
stream_socket_shutdown($this->resource, STREAM_SHUT_RDWR);
fclose($this->resource);
Expand All @@ -70,15 +98,17 @@ public function close()
/** {@inheritdoc} */
public function read(ChunkSocketResponse $previousResponse = null)
{
$this->setConnectedState();

$result = '';
$microseconds = 25000;
$microseconds = self::SELECT_DELAY;
$isDataChanged = false;
do {
$read = [ $this->resource ];
$nomatter = null;
$select = stream_select($read, $nomatter, $nomatter, 0, $microseconds);
if ($select === false) {
$this->throwNetworkSocketException('Failed to read data');
$this->throwNetworkSocketException('Failed to read data.');
}

if ($select === 0) {
Expand Down Expand Up @@ -109,16 +139,30 @@ public function read(ChunkSocketResponse $previousResponse = null)
/** {@inheritdoc} */
public function write($data)
{
$result = 0;
$dataLength = strlen($data);
$this->setConnectedState();

$result = 0;
$dataLength = strlen($data);
$microseconds = self::SELECT_DELAY;
$attempts = self::SEND_ATTEMPTS;

do {
$written = fwrite($this->resource, $data, strlen($data));
if ($written === false) {
$this->throwNetworkSocketException('Socket write failed');
$write = [ $this->resource ];
$nomatter = null;
$select = stream_select($nomatter, $write, $nomatter, 0, $microseconds);
if ($select === false) {
$this->throwNetworkSocketException('Failed to send data.');
}

$data = substr($data, $written);
$result += $written;
$bytesWritten = $write ? $this->writeActualData($data) : 0;
$attempts = $bytesWritten === 0 ? $attempts - 1 : self::SEND_ATTEMPTS;

if (!$attempts && $result !== $dataLength) {
$this->throwNetworkSocketException('Failed to send data.');
}

$data = substr($data, $bytesWritten);
$result += $bytesWritten;
} while ($result < $dataLength && $data !== false);

return $result;
Expand All @@ -130,7 +174,7 @@ public function setBlocking($isBlocking)
$result = stream_set_blocking($this->resource, $isBlocking ? 1 : 0);
if ($result === false) {
$this->throwNetworkSocketException(
'Failed to switch ' . ($isBlocking ? '': 'non-') . 'blocking mode'
'Failed to switch ' . ($isBlocking ? '': 'non-') . 'blocking mode.'
);
}

Expand Down Expand Up @@ -165,9 +209,75 @@ private function readActualData()
{
$data = fread($this->resource, self::SOCKET_BUFFER_SIZE);
if ($data === false) {
$this->throwNetworkSocketException('Failed to read data');
$this->throwNetworkSocketException('Failed to read data.');
}

if ($data === 0) {
$this->throwExceptionIfNotConnected('Remote connection has been lost.');
}

return $data;
}

/**
* Writes given data to socket
*
* @param string $data Data to write
*
* @return int Amount of written bytes
*/
private function writeActualData($data)
{
$test = stream_socket_sendto($this->resource, '');
if ($test !== 0) {
$this->throwNetworkSocketException('Socket write failed.');
}

$written = fwrite($this->resource, $data, strlen($data));
if ($written === false) {
$this->throwNetworkSocketException('Socket write failed.');
}

if ($written === 0) {
$this->throwExceptionIfNotConnected('Remote connection has been lost.');
}

return $written;
}

/**
* Verify, that we are in connected state
*
* @return void
*/
private function setConnectedState()
{
if (!is_resource($this->resource)) {
$message = $this->state === self::STATE_CONNECTED ?
'Connection was unexpectedly closed.' :
'Can not start io operation on uninitialized socket.';
$this->throwNetworkSocketException($message);
}

if ($this->state !== self::STATE_CONNECTED) {
$this->throwExceptionIfNotConnected('Connection refused.');

$this->state = self::STATE_CONNECTED;
}
}

/**
* Checks that we are in connected state
*
* @param string $message Message to pass in exception
*
* @return void
*/
private function throwExceptionIfNotConnected($message)
{
$name = stream_socket_get_name($this->resource, true);
if ($name === false) {
$this->throwNetworkSocketException($message);
}
}
}
4 changes: 2 additions & 2 deletions src/Socket/ClientSocket.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ protected function createSocketResource($address, $context)
$errno,
$errstr,
null,
STREAM_CLIENT_ASYNC_CONNECT,
STREAM_CLIENT_CONNECT | STREAM_CLIENT_ASYNC_CONNECT,
$context
);

if ($errno) {
if ($errno || $resource === false) {
throw new NetworkSocketException($this, $errstr, $errno);
}

Expand Down
10 changes: 10 additions & 0 deletions tests/RequestExecutor/RequestExecutorTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ protected function setUp()
PhpFunctionMocker::getPhpFunctionMocker('stream_socket_recvfrom')->setCallable(function () {
return '';
});

PhpFunctionMocker::getPhpFunctionMocker('stream_socket_sendto')->setCallable(function ($handle, $data) {
return strlen($data);
});

PhpFunctionMocker::getPhpFunctionMocker('stream_socket_get_name')->setCallable(function() {
return 'php://temp';
});
}

/** {@inheritdoc} */
Expand All @@ -77,6 +85,8 @@ protected function tearDown()
PhpFunctionMocker::getPhpFunctionMocker('microtime')->restoreNativeHandler();
PhpFunctionMocker::getPhpFunctionMocker('stream_select')->restoreNativeHandler();
PhpFunctionMocker::getPhpFunctionMocker('stream_socket_recvfrom')->restoreNativeHandler();
PhpFunctionMocker::getPhpFunctionMocker('stream_socket_sendto')->restoreNativeHandler();
PhpFunctionMocker::getPhpFunctionMocker('stream_socket_get_name')->restoreNativeHandler();
}

/**
Expand Down
11 changes: 11 additions & 0 deletions tests/Socket/ClientSocketTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ public function testWritePartialContent()
return 0;
});

$mocker = PhpFunctionMocker::getPhpFunctionMocker('stream_socket_sendto');
$mocker->setCallable(function ($handle, $data) {
return strlen($data);
});

$this->socket->open('it has no meaning here');
$this->socket->write($testString);
self::assertEquals($testString, $retString, 'Unexpected result was read');
Expand Down Expand Up @@ -282,15 +287,21 @@ protected function setUp()
$mocker->setCallable(function () {
return fopen('php://temp', 'rw');
});

PhpFunctionMocker::getPhpFunctionMocker('stream_socket_get_name')->setCallable(function() {
return 'php://temp';
});
}

/** {@inheritdoc} */
protected function tearDown()
{
PhpFunctionMocker::getPhpFunctionMocker('stream_socket_client')->restoreNativeHandler();
PhpFunctionMocker::getPhpFunctionMocker('stream_socket_get_name')->restoreNativeHandler();
PhpFunctionMocker::getPhpFunctionMocker('stream_set_blocking')->restoreNativeHandler();
PhpFunctionMocker::getPhpFunctionMocker('fread')->restoreNativeHandler();
PhpFunctionMocker::getPhpFunctionMocker('stream_socket_recvfrom')->restoreNativeHandler();
PhpFunctionMocker::getPhpFunctionMocker('stream_socket_sendto')->restoreNativeHandler();
PhpFunctionMocker::getPhpFunctionMocker('stream_select')->restoreNativeHandler();
PhpFunctionMocker::getPhpFunctionMocker('fwrite')->restoreNativeHandler();
}
Expand Down

0 comments on commit 0b0ed3d

Please sign in to comment.