Skip to content

Commit

Permalink
Merge branch 'pgadvisory'
Browse files Browse the repository at this point in the history
  • Loading branch information
willemstuursma committed May 26, 2018
2 parents df19397 + 51dae5f commit c79c8df
Show file tree
Hide file tree
Showing 9 changed files with 245 additions and 57 deletions.
6 changes: 4 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ env:

services:
- memcached
- mysql
- postgresql
- redis-server

matrix:
Expand All @@ -36,13 +38,13 @@ before_install:
- redis-server --port 63791 &

install:
- composer require --dev squizlabs/php_codesniffer
- composer install --no-scripts --no-suggest --no-interaction

before_script:
- mysql -e 'create database test;'
- psql -c 'create database test;' -U postgres

script:
- vendor/bin/phpunit
- vendor/bin/phpunit --debug
- vendor/bin/phpcs --standard=PSR2 classes/ tests/

62 changes: 62 additions & 0 deletions classes/mutex/PgAdvisoryLockMutex.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
<?php

namespace malkusch\lock\mutex;

use malkusch\lock\exception\LockAcquireException;
use malkusch\lock\exception\TimeoutException;

class PgAdvisoryLockMutex extends LockMutex
{
/**
* @var \PDO
*/
private $pdo;

/**
* @var int
*/
private $key1;

/**
* @var int
*/
private $key2;

/**
* @throws \RuntimeException
*/
public function __construct(\PDO $PDO, $name)
{
$this->pdo = $PDO;

$hashed_name = hash("sha256", $name, true);

if (false === $hashed_name) {
throw new \RuntimeException("Unable to hash the key, sha256 algorithm is not supported.");
}

list($bytes1, $bytes2) = str_split($hashed_name, 4);

$this->key1 = unpack("i", $bytes1)[1];
$this->key2 = unpack("i", $bytes2)[1];
}

public function lock()
{
$statement = $this->pdo->prepare("SELECT pg_advisory_lock(?,?)");

$statement->execute([
$this->key1,
$this->key2,
]);
}

public function unlock()
{
$statement = $this->pdo->prepare("SELECT pg_advisory_unlock(?,?)");
$statement->execute([
$this->key1,
$this->key2
]);
}
}
2 changes: 1 addition & 1 deletion classes/util/Loop.php
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public function execute(callable $code)
break;
}

$min = $minWait * 2 ** $i;
$min = (int) $minWait * 1.5 ** $i;
$max = $min * 2;

/*
Expand Down
12 changes: 10 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,33 @@
"autoload": {
"psr-4": {"malkusch\\lock\\": "classes/"}
},
"config": {
"sort-packages": true
},
"require": {
"php": ">=5.6",
"psr/log": "^1",
"paragonie/random_compat": "^1|^2"
},
"require-dev": {
"ext-memcached": "*",
"ext-redis": "*",
"ext-pcntl": "*",
"ext-pdo_mysql": "*",
"ext-pdo_sqlite": "*",
"ext-redis": "*",
"johnkary/phpunit-speedtrap": "^1.0",
"kriswallsmith/spork": "^0.3",
"mikey179/vfsStream": "^1.5.0",
"phpunit/phpunit": "^5",
"php-mock/php-mock-phpunit": "^1",
"phpunit/phpunit": "^5",
"predis/predis": "~1.0",
"squizlabs/php_codesniffer": "^3.2",
"zetacomponents/system-information": "~1.1"
},
"archive": {
"exclude": ["/tests"]
},
"scripts": {
"fix-cs": "vendor/bin/phpcbf --standard=PSR2 classes/ tests/"
}
}
3 changes: 3 additions & 0 deletions phpunit.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,7 @@
<testsuite>
<directory>tests</directory>
</testsuite>
<listeners>
<listener class="JohnKary\PHPUnit\Listener\SpeedTrapListener" />
</listeners>
</phpunit>
40 changes: 33 additions & 7 deletions tests/mutex/MutexConcurrencyTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,20 @@ class MutexConcurrencyTest extends \PHPUnit_Framework_TestCase
*/
private $pdo;

/**
* @var string
*/
private $path;

protected function tearDown()
{
if ($this->path) {
unlink($this->path);
}

parent::tearDown();
}

/**
* Gets a PDO instance.
*
Expand Down Expand Up @@ -102,13 +116,16 @@ public function provideTestHighContention()
{
$cases = array_map(function (array $mutexFactory) {
$file = tmpfile();
fwrite($file, pack("i", 0));
$this->assertEquals(4, fwrite($file, pack("i", 0)), "Expected 4 bytes to be written to temporary file.");

return [
function ($increment) use ($file) {
rewind($file);
flock($file, LOCK_EX);
$data = fread($file, 4);

$this->assertEquals(4, strlen($data), "Expected four bytes to be present in temporary file.");

$counter = unpack("i", $data)[1];

$counter += $increment;
Expand Down Expand Up @@ -209,16 +226,16 @@ public function testSerialisation(callable $mutexFactory)
*/
public function provideMutexFactories()
{
$path = stream_get_meta_data(tmpfile())["uri"];
$this->path = tempnam(sys_get_temp_dir(), "mutex-concurrency-test");

$cases = [
"flock" => [function ($timeout = 3) use ($path) {
$file = fopen($path, "w");
"flock" => [function ($timeout = 3) {
$file = fopen($this->path, "w");
return new FlockMutex($file);
}],

"semaphore" => [function ($timeout = 3) use ($path) {
$semaphore = sem_get(ftok($path, "b"));
"semaphore" => [function ($timeout = 3) {
$semaphore = sem_get(ftok($this->path, "b"));
$this->assertTrue(is_resource($semaphore));
return new SemaphoreMutex($semaphore);
}],
Expand Down Expand Up @@ -273,6 +290,15 @@ function ($uri) {
return new MySQLMutex($pdo, "test", $timeout);
}];
}

if (getenv("PGSQL_DSN")) {
$cases["PgAdvisoryLockMutex"] = [function () {
$pdo = new \PDO(getenv("PGSQL_DSN"), getenv("PGSQL_USER"));
$pdo->setAttribute(\PDO::ATTR_ERRMODE, \PDO::ERRMODE_EXCEPTION);

return new PgAdvisoryLockMutex($pdo, "test");
}];
}

return $cases;
}
Expand Down
40 changes: 10 additions & 30 deletions tests/mutex/MutexTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,16 @@ function ($uri) {
$pdo = new \PDO(getenv("MYSQL_DSN"), getenv("MYSQL_USER"));
$pdo->setAttribute(\PDO::ATTR_ERRMODE, \PDO::ERRMODE_EXCEPTION);

return new MySQLMutex($pdo, "test", self::TIMEOUT);
return new MySQLMutex($pdo, "test" . time(), self::TIMEOUT);
}];
}

if (getenv("PGSQL_DSN")) {
$cases["PgAdvisoryLockMutex"] = [function () {
$pdo = new \PDO(getenv("PGSQL_DSN"), getenv("PGSQL_USER"));
$pdo->setAttribute(\PDO::ATTR_ERRMODE, \PDO::ERRMODE_EXCEPTION);

return new PgAdvisoryLockMutex($pdo, "test");
}];
}

Expand Down Expand Up @@ -151,36 +160,7 @@ public function testRelease(callable $mutexFactory)
$mutex->synchronized(function () {
});
}

/**
* Tests that locks will be released automatically.
*
* @param callable $mutexFactory The Mutex factory.
* @test
* @dataProvider provideMutexFactories
*/
public function testLiveness(callable $mutexFactory)
{
$manager = new ProcessManager();
$manager->setDebug(true);

$manager->fork(function () use ($mutexFactory) {
$mutex = call_user_func($mutexFactory);
$mutex->synchronized(function () {
exit;
});
});
$manager->wait();

sleep(self::TIMEOUT - 1);

$mutex = call_user_func($mutexFactory);
$mutex->synchronized(function () {
});

$manager->check();
}

/**
* Tests synchronized() rethrows the exception of the code.
*
Expand Down
95 changes: 95 additions & 0 deletions tests/mutex/PgAdvisoryLockMutexTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
<?php

namespace malkusch\lock\mutex;

use malkusch\lock\exception\LockAcquireException;
use malkusch\lock\exception\LockReleaseException;

/**
* @author Willem Stuursma-Ruwen <[email protected]>
* @link bitcoin:1P5FAZ4QhXCuwYPnLZdk3PJsqePbu1UDDA Donations
* @license WTFPL
*/
class PgAdvisoryLockMutexTest extends \PHPUnit_Framework_TestCase
{
/**
* @var \PDO|\PHPUnit_Framework_MockObject_MockObject
*/
private $pdo;

/**
* @var PgAdvisoryLockMutex
*/
private $mutex;

protected function setUp()
{
parent::setUp();

$this->pdo = $this->createMock(\PDO::class);

$this->mutex = new PgAdvisoryLockMutex($this->pdo, "test" . uniqid());
}

public function testAcquireLock()
{
$statement = $this->createMock(\PDOStatement::class);

$this->pdo->expects($this->once())
->method("prepare")
->with("SELECT pg_advisory_lock(?,?)")
->willReturn($statement);

$statement->expects($this->once())
->method("execute")
->with(
$this->logicalAnd(
$this->isType("array"),
$this->countOf(2),
$this->callback(function (...$arguments) {
$integers = $arguments[0];

foreach ($integers as $each) {
$this->assertInternalType("integer", $each);
}

return true;
})
)
);

$this->mutex->lock();
}

public function testReleaseLock()
{
$statement = $this->createMock(\PDOStatement::class);

$this->pdo->expects($this->once())
->method("prepare")
->with("SELECT pg_advisory_unlock(?,?)")
->willReturn($statement);

$statement->expects($this->once())
->method("execute")
->with(
$this->logicalAnd(
$this->isType("array"),
$this->countOf(2),
$this->callback(function (...$arguments) {
$integers = $arguments[0];

foreach ($integers as $each) {
$this->assertLessThan(1 << 32, $each);
$this->assertGreaterThan(-(1 << 32), $each);
$this->assertInternalType("integer", $each);
}

return true;
})
)
);

$this->mutex->unlock();
}
}
Loading

0 comments on commit c79c8df

Please sign in to comment.