diff --git a/.travis.yml b/.travis.yml
index 287eab5f0..19f623a12 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -44,6 +44,11 @@ cache:
directories:
- $HOME/.composer/cache
+before_install:
+ - echo "extension = mongodb.so" >> $HOME/.phpenv/versions/$(phpenv version-name)/etc/php.ini
+ - php -m
+ - php -i | grep -C 15 mongo
+
install:
- rm $HOME/.phpenv/versions/$(phpenv version-name)/etc/conf.d/xdebug.ini;
- echo "memory_limit=2048M" >> ~/.phpenv/versions/$(phpenv version-name)/etc/conf.d/travis.ini
diff --git a/README.md b/README.md
index 59b0ae68e..b6df3127b 100644
--- a/README.md
+++ b/README.md
@@ -61,6 +61,10 @@ Features:
[![Build Status](https://travis-ci.org/php-enqueue/fs.png?branch=master)](https://travis-ci.org/php-enqueue/fs)
[![Total Downloads](https://poser.pugx.org/enqueue/fs/d/total.png)](https://packagist.org/packages/enqueue/fs)
[![Latest Stable Version](https://poser.pugx.org/enqueue/fs/version.png)](https://packagist.org/packages/enqueue/fs)
+ * [Mongodb](docs/transport/mongodb.md)
+[![Build Status](https://travis-ci.org/php-enqueue/mongodb.png?branch=master)](https://travis-ci.org/php-enqueue/mongodb)
+[![Total Downloads](https://poser.pugx.org/enqueue/mongodb/d/total.png)](https://packagist.org/packages/enqueue/mongodb)
+[![Latest Stable Version](https://poser.pugx.org/enqueue/mongodb/version.png)](https://packagist.org/packages/enqueue/mongodb)
* [Null](docs/transport/null.md).
[![Build Status](https://travis-ci.org/php-enqueue/null.png?branch=master)](https://travis-ci.org/php-enqueue/null)
[![Total Downloads](https://poser.pugx.org/enqueue/null/d/total.png)](https://packagist.org/packages/enqueue/null)
diff --git a/bin/run-fun-test.sh b/bin/run-fun-test.sh
index b8150ac54..626741366 100755
--- a/bin/run-fun-test.sh
+++ b/bin/run-fun-test.sh
@@ -3,4 +3,4 @@
set -x
set -e
-COMPOSE_PROJECT_NAME=mqdev docker-compose run --workdir="/mqdev" --rm dev ./bin/test "$@"
\ No newline at end of file
+docker-compose run --workdir="/mqdev" --rm dev ./bin/test "$@"
\ No newline at end of file
diff --git a/bin/test b/bin/test
index 7a91fb8c3..6607d84c9 100755
--- a/bin/test
+++ b/bin/test
@@ -36,6 +36,7 @@ waitForService redis 6379 50
waitForService beanstalkd 11300 50
waitForService gearmand 4730 50
waitForService kafka 9092 50
+waitForService mongo 27017 50
php pkg/job-queue/Tests/Functional/app/console doctrine:database:create --if-not-exists
php pkg/job-queue/Tests/Functional/app/console doctrine:schema:update --force
diff --git a/composer.json b/composer.json
index 35777310d..c5bbe060f 100644
--- a/composer.json
+++ b/composer.json
@@ -16,6 +16,7 @@
"enqueue/fs": "*@dev",
"enqueue/null": "*@dev",
"enqueue/dbal": "*@dev",
+ "enqueue/mongodb": "*@dev",
"enqueue/sqs": "*@dev",
"enqueue/pheanstalk": "*@dev",
"enqueue/gearman": "*@dev",
@@ -60,7 +61,8 @@
"platform": {
"ext-amqp": "1.9.3",
"ext-gearman": "1.1",
- "ext-rdkafka": "3.3"
+ "ext-rdkafka": "3.3",
+ "ext-mongodb": "1.3"
}
},
"repositories": [
@@ -143,6 +145,10 @@
{
"type": "path",
"url": "pkg/async-event-dispatcher"
+ },
+ {
+ "type": "path",
+ "url": "pkg/mongodb"
}
]
}
diff --git a/docker-compose.yml b/docker-compose.yml
index 01be96cee..77a4773a0 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -13,6 +13,7 @@ services:
- zookeeper
- google-pubsub
- rabbitmqssl
+ - mongo
volumes:
- './:/mqdev'
environment:
@@ -24,7 +25,7 @@ services:
- RABBITMQ_PASSWORD=guest
- RABBITMQ_VHOST=mqdev
- RABBITMQ_AMQP__PORT=5672
- - RABBITMQ_STOMP_PORT=61613
+ - RABBITMQ_STOMP_PORT=61613
- DOCTRINE_DRIVER=pdo_mysql
- DOCTRINE_HOST=mysql
- DOCTRINE_PORT=3306
@@ -44,6 +45,7 @@ services:
- RDKAFKA_PORT=9092
- PUBSUB_EMULATOR_HOST=http://google-pubsub:8085
- GCLOUD_PROJECT=mqdev
+ - MONGO_DSN=mongodb://mongo
rabbitmq:
image: 'enqueue/rabbitmq:latest'
@@ -102,6 +104,11 @@ services:
image: 'google/cloud-sdk:latest'
entrypoint: 'gcloud beta emulators pubsub start --host-port=0.0.0.0:8085'
+ mongo:
+ image: mongo:3.7
+ ports:
+ - "27017:27017"
+
volumes:
mysql-data:
driver: local
diff --git a/docker/Dockerfile b/docker/Dockerfile
index 7c23fb7ec..323e241d5 100644
--- a/docker/Dockerfile
+++ b/docker/Dockerfile
@@ -13,10 +13,6 @@ RUN set -x && \
git clone https://github.com/pdezwart/php-amqp.git . && git checkout v1.9.3 && \
phpize --clean && phpize && ./configure && make install
-## confis
-
-# RUN rm -f /etc/php/7.0/cli/conf.d/*xdebug.ini
-
## librdkafka
RUN set -x && \
apt-get update && \
@@ -27,10 +23,10 @@ RUN set -x && \
git checkout v0.11.1 && \
./configure && make && make install && \
pecl install rdkafka && \
- echo "extension=rdkafka.so" > /etc/php/7.1/cli/conf.d/10-rdkafka.ini && \
- echo "extension=rdkafka.so" > /etc/php/7.1/fpm/conf.d/10-rdkafka.ini
+ echo "extension=rdkafka.so" > /etc/php/7.2/cli/conf.d/10-rdkafka.ini && \
+ echo "extension=rdkafka.so" > /etc/php/7.2/fpm/conf.d/10-rdkafka.ini
-COPY ./php/cli.ini /etc/php/7.1/cli/conf.d/1-dev_cli.ini
+COPY ./php/cli.ini /etc/php/7.2/cli/conf.d/1-dev_cli.ini
COPY ./bin/dev_entrypoiny.sh /usr/local/bin/entrypoint.sh
RUN chmod u+x /usr/local/bin/entrypoint.sh
diff --git a/docs/transport/mongodb.md b/docs/transport/mongodb.md
new file mode 100644
index 000000000..8bae7bd97
--- /dev/null
+++ b/docs/transport/mongodb.md
@@ -0,0 +1,142 @@
+# Mongodb transport
+
+Allows to use [MongoDB](https://www.mongodb.com/) as a message queue broker.
+
+* [Installation](#installation)
+* [Create context](#create-context)
+* [Send message to topic](#send-message-to-topic)
+* [Send message to queue](#send-message-to-queue)
+* [Send priority message](#send-priority-message)
+* [Send expiration message](#send-expiration-message)
+* [Send delayed message](#send-delayed-message)
+* [Consume message](#consume-message)
+
+## Installation
+
+```bash
+$ composer require enqueue/mongodb
+```
+
+## Create context
+
+```php
+ 'mongodb://localhost:27017/db_name',
+ 'dbname' => 'enqueue',
+ 'collection_name' => 'enqueue',
+ 'polling_interval' => '1000',
+]);
+
+$psrContext = $factory->createContext();
+
+// if you have enqueue/enqueue library installed you can use a function from there to create the context
+$psrContext = \Enqueue\dsn_to_context('mongodb:');
+```
+
+## Send message to topic
+
+```php
+createMessage('Hello world!');
+
+$psrContext->createProducer()->send($fooTopic, $message);
+```
+
+## Send message to queue
+
+```php
+createMessage('Hello world!');
+
+$psrContext->createProducer()->send($fooQueue, $message);
+```
+
+## Send priority message
+
+```php
+createQueue('foo');
+
+$message = $psrContext->createMessage('Hello world!');
+
+$psrContext->createProducer()
+ ->setPriority(5) // the higher priority the sooner a message gets to a consumer
+ //
+ ->send($fooQueue, $message)
+;
+```
+
+## Send expiration message
+
+```php
+createMessage('Hello world!');
+
+$psrContext->createProducer()
+ ->setTimeToLive(60000) // 60 sec
+ //
+ ->send($fooQueue, $message)
+;
+```
+
+## Send delayed message
+
+```php
+createMessage('Hello world!');
+
+$psrContext->createProducer()
+ ->setDeliveryDelay(5000) // 5 sec
+
+ ->send($fooQueue, $message)
+;
+````
+
+## Consume message:
+
+```php
+createConsumer($fooQueue);
+
+$message = $consumer->receive();
+
+// process a message
+
+$consumer->acknowledge($message);
+// $consumer->reject($message);
+```
+
+[back to index](../index.md)
diff --git a/phpunit.xml.dist b/phpunit.xml.dist
index b3b93f5a2..61c7430cb 100644
--- a/phpunit.xml.dist
+++ b/phpunit.xml.dist
@@ -77,6 +77,10 @@
pkg/gps/Tests
+
+ pkg/mongodb/Tests
+
+
pkg/enqueue-bundle/Tests
diff --git a/pkg/enqueue-bundle/EnqueueBundle.php b/pkg/enqueue-bundle/EnqueueBundle.php
index a62f2db2b..843cd498d 100644
--- a/pkg/enqueue-bundle/EnqueueBundle.php
+++ b/pkg/enqueue-bundle/EnqueueBundle.php
@@ -21,6 +21,7 @@
use Enqueue\Fs\Symfony\FsTransportFactory;
use Enqueue\Gps\GpsConnectionFactory;
use Enqueue\Gps\Symfony\GpsTransportFactory;
+use Enqueue\Mongodb\Symfony\MongodbTransportFactory;
use Enqueue\RdKafka\RdKafkaConnectionFactory;
use Enqueue\RdKafka\Symfony\RdKafkaTransportFactory;
use Enqueue\Redis\RedisConnectionFactory;
@@ -112,6 +113,12 @@ class_exists(AmqpLibConnectionFactory::class)
$extension->setTransportFactory(new MissingTransportFactory('rdkafka', ['enqueue/rdkafka']));
}
+ if (class_exists(MongodbTransportFactory::class)) {
+ $extension->setTransportFactory(new MongodbTransportFactory('mongodb'));
+ } else {
+ $extension->setTransportFactory(new MissingTransportFactory('mongodb', ['enqueue/mongodb']));
+ }
+
$container->addCompilerPass(new AsyncEventsPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100);
$container->addCompilerPass(new AsyncTransformersPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100);
}
diff --git a/pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php b/pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php
index cec646a7c..34adc2c68 100644
--- a/pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php
+++ b/pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php
@@ -206,6 +206,13 @@ public function provideEnqueueConfigs()
]];
}
+ yield 'mongodb_dsn' => [[
+ 'transport' => [
+ 'default' => 'mongodb',
+ 'mongodb' => getenv('MONGO_DSN'),
+ ],
+ ]];
+
// yield 'gps' => [[
// 'transport' => [
// 'default' => 'gps',
diff --git a/pkg/enqueue/Symfony/DefaultTransportFactory.php b/pkg/enqueue/Symfony/DefaultTransportFactory.php
index 65b877a30..fa45d2544 100644
--- a/pkg/enqueue/Symfony/DefaultTransportFactory.php
+++ b/pkg/enqueue/Symfony/DefaultTransportFactory.php
@@ -8,6 +8,8 @@
use Enqueue\Fs\Symfony\FsTransportFactory;
use Enqueue\Gps\GpsConnectionFactory;
use Enqueue\Gps\Symfony\GpsTransportFactory;
+use Enqueue\Mongodb\MongodbConnectionFactory;
+use Enqueue\Mongodb\Symfony\MongodbTransportFactory;
use Enqueue\Null\NullConnectionFactory;
use Enqueue\Null\Symfony\NullTransportFactory;
use Enqueue\RdKafka\RdKafkaConnectionFactory;
@@ -215,6 +217,10 @@ private function findFactory($dsn)
return new RdKafkaTransportFactory('default_kafka');
}
+ if ($factory instanceof MongodbConnectionFactory) {
+ return new MongodbTransportFactory('default_mongodb');
+ }
+
throw new \LogicException(sprintf(
'There is no supported transport factory for the connection factory "%s" created from DSN "%s"',
get_class($factory),
diff --git a/pkg/enqueue/Tests/Functions/DsnToConnectionFactoryFunctionTest.php b/pkg/enqueue/Tests/Functions/DsnToConnectionFactoryFunctionTest.php
index f123b116b..2c65eb6d9 100644
--- a/pkg/enqueue/Tests/Functions/DsnToConnectionFactoryFunctionTest.php
+++ b/pkg/enqueue/Tests/Functions/DsnToConnectionFactoryFunctionTest.php
@@ -9,6 +9,7 @@
use Enqueue\Fs\FsConnectionFactory;
use Enqueue\Gearman\GearmanConnectionFactory;
use Enqueue\Gps\GpsConnectionFactory;
+use Enqueue\Mongodb\MongodbConnectionFactory;
use Enqueue\Null\NullConnectionFactory;
use Enqueue\Pheanstalk\PheanstalkConnectionFactory;
use Enqueue\RdKafka\RdKafkaConnectionFactory;
@@ -97,5 +98,7 @@ public static function provideDSNs()
yield ['sqs:', SqsConnectionFactory::class];
yield ['gps:', GpsConnectionFactory::class];
+
+ yield ['mongodb:', MongodbConnectionFactory::class];
}
}
diff --git a/pkg/enqueue/Tests/Symfony/DefaultTransportFactoryTest.php b/pkg/enqueue/Tests/Symfony/DefaultTransportFactoryTest.php
index 8f2df545b..a97fb86b8 100644
--- a/pkg/enqueue/Tests/Symfony/DefaultTransportFactoryTest.php
+++ b/pkg/enqueue/Tests/Symfony/DefaultTransportFactoryTest.php
@@ -291,5 +291,7 @@ public static function provideDSNs()
yield ['stomp:', 'default_stomp'];
yield ['kafka:', 'default_kafka'];
+
+ yield ['mongodb:', 'default_mongodb'];
}
}
diff --git a/pkg/enqueue/functions.php b/pkg/enqueue/functions.php
index 8960785b7..537f9964a 100644
--- a/pkg/enqueue/functions.php
+++ b/pkg/enqueue/functions.php
@@ -10,6 +10,7 @@
use Enqueue\Fs\FsConnectionFactory;
use Enqueue\Gearman\GearmanConnectionFactory;
use Enqueue\Gps\GpsConnectionFactory;
+use Enqueue\Mongodb\MongodbConnectionFactory;
use Enqueue\Null\NullConnectionFactory;
use Enqueue\Pheanstalk\PheanstalkConnectionFactory;
use Enqueue\RdKafka\RdKafkaConnectionFactory;
@@ -108,6 +109,10 @@ function dsn_to_connection_factory($dsn)
$map['gps'] = GpsConnectionFactory::class;
}
+ if (class_exists(MongodbConnectionFactory::class)) {
+ $map['mongodb'] = MongodbConnectionFactory::class;
+ }
+
list($scheme) = explode(':', $dsn, 2);
if (false == $scheme || false === strpos($dsn, ':')) {
throw new \LogicException(sprintf('The scheme could not be parsed from DSN "%s"', $dsn));
diff --git a/pkg/mongodb/.gitignore b/pkg/mongodb/.gitignore
new file mode 100644
index 000000000..57bbbe0bb
--- /dev/null
+++ b/pkg/mongodb/.gitignore
@@ -0,0 +1,7 @@
+*~
+/composer.lock
+/composer.phar
+/phpunit.xml
+/vendor/
+/.idea/
+/examples/
diff --git a/pkg/mongodb/Client/MongodbDriver.php b/pkg/mongodb/Client/MongodbDriver.php
new file mode 100644
index 000000000..aa0d4f999
--- /dev/null
+++ b/pkg/mongodb/Client/MongodbDriver.php
@@ -0,0 +1,186 @@
+ 0,
+ MessagePriority::LOW => 1,
+ MessagePriority::NORMAL => 2,
+ MessagePriority::HIGH => 3,
+ MessagePriority::VERY_HIGH => 4,
+ ];
+
+ /**
+ * @param MongodbContext $context
+ * @param Config $config
+ * @param QueueMetaRegistry $queueMetaRegistry
+ */
+ public function __construct(MongodbContext $context, Config $config, QueueMetaRegistry $queueMetaRegistry)
+ {
+ $this->context = $context;
+ $this->config = $config;
+ $this->queueMetaRegistry = $queueMetaRegistry;
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @return MongodbMessage
+ */
+ public function createTransportMessage(Message $message)
+ {
+ $properties = $message->getProperties();
+
+ $headers = $message->getHeaders();
+ $headers['content_type'] = $message->getContentType();
+
+ $transportMessage = $this->context->createMessage();
+ $transportMessage->setBody($message->getBody());
+ $transportMessage->setHeaders($headers);
+ $transportMessage->setProperties($properties);
+ $transportMessage->setMessageId($message->getMessageId());
+ $transportMessage->setTimestamp($message->getTimestamp());
+ $transportMessage->setDeliveryDelay($message->getDelay());
+ $transportMessage->setReplyTo($message->getReplyTo());
+ $transportMessage->setCorrelationId($message->getCorrelationId());
+ if (array_key_exists($message->getPriority(), self::$priorityMap)) {
+ $transportMessage->setPriority(self::$priorityMap[$message->getPriority()]);
+ }
+
+ return $transportMessage;
+ }
+
+ /**
+ * @param MongodbMessage $message
+ *
+ * {@inheritdoc}
+ */
+ public function createClientMessage(PsrMessage $message)
+ {
+ $clientMessage = new Message();
+
+ $clientMessage->setBody($message->getBody());
+ $clientMessage->setHeaders($message->getHeaders());
+ $clientMessage->setProperties($message->getProperties());
+
+ $clientMessage->setContentType($message->getHeader('content_type'));
+ $clientMessage->setMessageId($message->getMessageId());
+ $clientMessage->setTimestamp($message->getTimestamp());
+ $clientMessage->setDelay($message->getDeliveryDelay());
+ $clientMessage->setReplyTo($message->getReplyTo());
+ $clientMessage->setCorrelationId($message->getCorrelationId());
+
+ $priorityMap = array_flip(self::$priorityMap);
+ $priority = array_key_exists($message->getPriority(), $priorityMap) ?
+ $priorityMap[$message->getPriority()] :
+ MessagePriority::NORMAL;
+ $clientMessage->setPriority($priority);
+
+ return $clientMessage;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function sendToRouter(Message $message)
+ {
+ if (false == $message->getProperty(Config::PARAMETER_TOPIC_NAME)) {
+ throw new \LogicException('Topic name parameter is required but is not set');
+ }
+
+ $queue = $this->createQueue($this->config->getRouterQueueName());
+ $transportMessage = $this->createTransportMessage($message);
+
+ $this->context->createProducer()->send($queue, $transportMessage);
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function sendToProcessor(Message $message)
+ {
+ if (false == $message->getProperty(Config::PARAMETER_PROCESSOR_NAME)) {
+ throw new \LogicException('Processor name parameter is required but is not set');
+ }
+
+ if (false == $queueName = $message->getProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME)) {
+ throw new \LogicException('Queue name parameter is required but is not set');
+ }
+
+ $transportMessage = $this->createTransportMessage($message);
+ $destination = $this->createQueue($queueName);
+
+ $this->context->createProducer()->send($destination, $transportMessage);
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function createQueue($queueName)
+ {
+ $transportName = $this->queueMetaRegistry->getQueueMeta($queueName)->getTransportName();
+
+ return $this->context->createQueue($transportName);
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function setupBroker(LoggerInterface $logger = null)
+ {
+ $logger = $logger ?: new NullLogger();
+ $log = function ($text, ...$args) use ($logger) {
+ $logger->debug(sprintf('[MongodbDriver] '.$text, ...$args));
+ };
+ $contextConfig = $this->context->getConfig();
+ $log('Creating database and collection: "%s" "%s"', $contextConfig['dbname'], $contextConfig['collection_name']);
+ $this->context->createCollection();
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getConfig()
+ {
+ return $this->config;
+ }
+
+ /**
+ * @return array
+ */
+ public static function getPriorityMap()
+ {
+ return self::$priorityMap;
+ }
+}
diff --git a/pkg/mongodb/JSON.php b/pkg/mongodb/JSON.php
new file mode 100644
index 000000000..84cac50da
--- /dev/null
+++ b/pkg/mongodb/JSON.php
@@ -0,0 +1,59 @@
+ 'mongodb://127.0.0.1/' - Mongodb connection string. see http://docs.mongodb.org/manual/reference/connection-string/
+ * 'dbname' => 'enqueue', - database name.
+ * 'collection_name' => 'enqueue' - collection name
+ * 'polling_interval' => '1000', - How often query for new messages (milliseconds)
+ * ]
+ *
+ * or
+ *
+ * mongodb://127.0.0.1:27017/dbname?polling_interval=1000&enqueue_collection=enqueue
+ *
+ * @param array|string|null $config
+ */
+ public function __construct($config = 'mongodb:')
+ {
+ if (empty($config)) {
+ $config = $this->parseDsn('mongodb:');
+ } elseif (is_string($config)) {
+ $config = $this->parseDsn($config);
+ } elseif (is_array($config)) {
+ } else {
+ throw new \LogicException('The config must be either an array of options, a DSN string or null');
+ }
+ $config = array_replace([
+ 'dsn' => 'mongodb://127.0.0.1/',
+ 'dbname' => 'enqueue',
+ 'collection_name' => 'enqueue',
+ ], $config);
+
+ $this->config = $config;
+ }
+
+ public function createContext()
+ {
+ $client = new Client($this->config['dsn']);
+
+ return new MongodbContext($client, $this->config);
+ }
+
+ public static function parseDsn($dsn)
+ {
+ $parsedUrl = parse_url($dsn);
+ if (false === $parsedUrl) {
+ throw new \LogicException(sprintf('Failed to parse DSN "%s"', $dsn));
+ }
+ if (empty($parsedUrl['scheme'])) {
+ throw new \LogicException('Schema is empty');
+ }
+ $supported = [
+ 'mongodb' => true,
+ ];
+ if (false == isset($parsedUrl['scheme'])) {
+ throw new \LogicException(sprintf(
+ 'The given DSN schema "%s" is not supported. There are supported schemes: "%s".',
+ $parsedUrl['scheme'],
+ implode('", "', array_keys($supported))
+ ));
+ }
+ if ('mongodb:' === $dsn) {
+ return [
+ 'dsn' => 'mongodb://127.0.0.1/',
+ ];
+ }
+ $config['dsn'] = $dsn;
+ if (isset($parsedUrl['path']) && '/' !== $parsedUrl['path']) {
+ $pathParts = explode('/', $parsedUrl['path']);
+ //DB name
+ if ($pathParts[1]) {
+ $config['dbname'] = $pathParts[1];
+ }
+ }
+ if (isset($parsedUrl['query'])) {
+ $queryParts = null;
+ parse_str($parsedUrl['query'], $queryParts);
+ //get enqueue attributes values
+ if (!empty($queryParts['polling_interval'])) {
+ $config['polling_interval'] = $queryParts['polling_interval'];
+ }
+ if (!empty($queryParts['enqueue_collection'])) {
+ $config['collection_name'] = $queryParts['enqueue_collection'];
+ }
+ }
+
+ return $config;
+ }
+}
diff --git a/pkg/mongodb/MongodbConsumer.php b/pkg/mongodb/MongodbConsumer.php
new file mode 100644
index 000000000..6fe34289c
--- /dev/null
+++ b/pkg/mongodb/MongodbConsumer.php
@@ -0,0 +1,177 @@
+context = $context;
+ $this->queue = $queue;
+ }
+
+ /**
+ * Set polling interval in milliseconds.
+ *
+ * @param int $msec
+ */
+ public function setPollingInterval($msec)
+ {
+ $this->pollingInterval = $msec * 1000;
+ }
+
+ /**
+ * Get polling interval in milliseconds.
+ *
+ * @return int
+ */
+ public function getPollingInterval()
+ {
+ return (int) $this->pollingInterval / 1000;
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @return MongodbDestination
+ */
+ public function getQueue()
+ {
+ return $this->queue;
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @return MongodbMessage|null
+ */
+ public function receive($timeout = 0)
+ {
+ $timeout /= 1000;
+ $startAt = microtime(true);
+
+ while (true) {
+ $message = $this->receiveMessage();
+
+ if ($message) {
+ return $message;
+ }
+
+ if ($timeout && (microtime(true) - $startAt) >= $timeout) {
+ return;
+ }
+
+ usleep($this->pollingInterval);
+
+ if ($timeout && (microtime(true) - $startAt) >= $timeout) {
+ return;
+ }
+ }
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @return MongodbMessage|null
+ */
+ public function receiveNoWait()
+ {
+ return $this->receiveMessage();
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @param MongodbMessage $message
+ */
+ public function acknowledge(PsrMessage $message)
+ {
+ // does nothing
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @param MongodbMessage $message
+ */
+ public function reject(PsrMessage $message, $requeue = false)
+ {
+ InvalidMessageException::assertMessageInstanceOf($message, MongodbMessage::class);
+
+ if ($requeue) {
+ $this->context->createProducer()->send($this->queue, $message);
+
+ return;
+ }
+ }
+
+ /**
+ * @return MongodbMessage|null
+ */
+ protected function receiveMessage()
+ {
+ $now = time();
+ $collection = $this->context->getCollection();
+ $message = $collection->findOneAndDelete(
+ [
+ '$or' => [
+ ['delayed_until' => ['$exists' => false]],
+ ['delayed_until' => ['$lte' => $now]],
+ ],
+ ],
+ [
+ 'sort' => ['priority' => -1, 'published_at' => 1],
+ 'typeMap' => ['root' => 'array', 'document' => 'array'],
+ ]
+ );
+
+ if (!$message) {
+ return null;
+ }
+ if (empty($message['time_to_live']) || $message['time_to_live'] > time()) {
+ return $this->convertMessage($message);
+ }
+ }
+
+ /**
+ * @param array $dbalMessage
+ *
+ * @return MongodbMessage
+ */
+ protected function convertMessage(array $mongodbMessage)
+ {
+ $properties = JSON::decode($mongodbMessage['properties']);
+ $headers = JSON::decode($mongodbMessage['headers']);
+
+ $message = $this->context->createMessage($mongodbMessage['body'], $properties, $headers);
+ $message->setId((string) $mongodbMessage['_id']);
+ $message->setPriority((int) $mongodbMessage['priority']);
+ $message->setRedelivered((bool) $mongodbMessage['redelivered']);
+ $message->setPublishedAt((int) $mongodbMessage['published_at']);
+
+ return $message;
+ }
+}
diff --git a/pkg/mongodb/MongodbContext.php b/pkg/mongodb/MongodbContext.php
new file mode 100644
index 000000000..ce8945e53
--- /dev/null
+++ b/pkg/mongodb/MongodbContext.php
@@ -0,0 +1,110 @@
+config = array_replace([
+ 'dbname' => 'enqueue',
+ 'collection_name' => 'enqueue',
+ 'polling_interval' => null,
+ ], $config);
+
+ $this->client = $client;
+ }
+
+ public function createMessage($body = '', array $properties = [], array $headers = [])
+ {
+ $message = new MongodbMessage();
+ $message->setBody($body);
+ $message->setProperties($properties);
+ $message->setHeaders($headers);
+
+ return $message;
+ }
+
+ public function createTopic($name)
+ {
+ return new MongodbDestination($name);
+ }
+
+ public function createQueue($queueName)
+ {
+ return new MongodbDestination($queueName);
+ }
+
+ public function createTemporaryQueue()
+ {
+ throw new \BadMethodCallException('Mongodb transport does not support temporary queues');
+ }
+
+ public function createProducer()
+ {
+ return new MongodbProducer($this);
+ }
+
+ public function createConsumer(PsrDestination $destination)
+ {
+ InvalidDestinationException::assertDestinationInstanceOf($destination, MongodbDestination::class);
+
+ $consumer = new MongodbConsumer($this, $destination);
+
+ if (isset($this->config['polling_interval'])) {
+ $consumer->setPollingInterval($this->config['polling_interval']);
+ }
+
+ return $consumer;
+ }
+
+ public function close()
+ {
+ // TODO: Implement close() method.
+ }
+
+ public function getCollection()
+ {
+ return $this->client
+ ->selectDatabase($this->config['dbname'])
+ ->selectCollection($this->config['collection_name']);
+ }
+
+ /**
+ * @return Client
+ */
+ public function getClient()
+ {
+ return $this->client;
+ }
+
+ /**
+ * @return array
+ */
+ public function getConfig()
+ {
+ return $this->config;
+ }
+
+ public function createCollection()
+ {
+ $collection = $this->getCollection();
+ $collection->createIndex(['priority' => -1, 'published_at' => 1], ['name' => 'enqueue_priority']);
+ $collection->createIndex(['delayed_until' => 1], ['name' => 'enqueue_delayed']);
+ }
+}
diff --git a/pkg/mongodb/MongodbDestination.php b/pkg/mongodb/MongodbDestination.php
new file mode 100644
index 000000000..360f58a25
--- /dev/null
+++ b/pkg/mongodb/MongodbDestination.php
@@ -0,0 +1,47 @@
+destinationName = $name;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getQueueName()
+ {
+ return $this->destinationName;
+ }
+
+ /**
+ * Alias for getQueueName()
+ * {@inheritdoc}
+ */
+ public function getName()
+ {
+ return $this->getQueueName();
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getTopicName()
+ {
+ return $this->destinationName;
+ }
+}
diff --git a/pkg/mongodb/MongodbMessage.php b/pkg/mongodb/MongodbMessage.php
new file mode 100644
index 000000000..9a10e5f7e
--- /dev/null
+++ b/pkg/mongodb/MongodbMessage.php
@@ -0,0 +1,316 @@
+body = $body;
+ $this->properties = $properties;
+ $this->headers = $headers;
+ $this->redelivered = false;
+ }
+
+ /**
+ * @param string $id
+ */
+ public function setId($id)
+ {
+ $this->id = $id;
+ }
+
+ /**
+ * @return string $id
+ */
+ public function getId()
+ {
+ return $this->id;
+ }
+
+ /**
+ * @param string $body
+ */
+ public function setBody($body)
+ {
+ $this->body = $body;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getBody()
+ {
+ return $this->body;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function setProperties(array $properties)
+ {
+ $this->properties = $properties;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function setProperty($name, $value)
+ {
+ $this->properties[$name] = $value;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getProperties()
+ {
+ return $this->properties;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getProperty($name, $default = null)
+ {
+ return array_key_exists($name, $this->properties) ? $this->properties[$name] : $default;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function setHeader($name, $value)
+ {
+ $this->headers[$name] = $value;
+ }
+
+ /**
+ * @param array $headers
+ */
+ public function setHeaders(array $headers)
+ {
+ $this->headers = $headers;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getHeaders()
+ {
+ return $this->headers;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getHeader($name, $default = null)
+ {
+ return array_key_exists($name, $this->headers) ? $this->headers[$name] : $default;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function isRedelivered()
+ {
+ return $this->redelivered;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function setRedelivered($redelivered)
+ {
+ $this->redelivered = $redelivered;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function setReplyTo($replyTo)
+ {
+ $this->setHeader('reply_to', $replyTo);
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getReplyTo()
+ {
+ return $this->getHeader('reply_to');
+ }
+
+ /**
+ * @return int
+ */
+ public function getPriority()
+ {
+ return $this->priority;
+ }
+
+ /**
+ * @param int $priority
+ */
+ public function setPriority($priority)
+ {
+ $this->priority = $priority;
+ }
+
+ /**
+ * @return int
+ */
+ public function getDeliveryDelay()
+ {
+ return $this->deliveryDelay;
+ }
+
+ /**
+ * Set delay in milliseconds.
+ *
+ * @param int $deliveryDelay
+ */
+ public function setDeliveryDelay($deliveryDelay)
+ {
+ $this->deliveryDelay = $deliveryDelay;
+ }
+
+ /**
+ * @return int|float|null
+ */
+ public function getTimeToLive()
+ {
+ return $this->timeToLive;
+ }
+
+ /**
+ * Set time to live in milliseconds.
+ *
+ * @param int|float|null $timeToLive
+ */
+ public function setTimeToLive($timeToLive)
+ {
+ $this->timeToLive = $timeToLive;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function setCorrelationId($correlationId)
+ {
+ $this->setHeader('correlation_id', $correlationId);
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getCorrelationId()
+ {
+ return $this->getHeader('correlation_id', null);
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function setMessageId($messageId)
+ {
+ $this->setHeader('message_id', $messageId);
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getMessageId()
+ {
+ return $this->getHeader('message_id', null);
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getTimestamp()
+ {
+ $value = $this->getHeader('timestamp');
+
+ return null === $value ? null : (int) $value;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function setTimestamp($timestamp)
+ {
+ $this->setHeader('timestamp', $timestamp);
+ }
+
+ /**
+ * @return int
+ */
+ public function getPublishedAt()
+ {
+ return $this->publishedAt;
+ }
+
+ /**
+ * @param int $publishedAt
+ */
+ public function setPublishedAt($publishedAt)
+ {
+ $this->publishedAt = $publishedAt;
+ }
+}
diff --git a/pkg/mongodb/MongodbProducer.php b/pkg/mongodb/MongodbProducer.php
new file mode 100644
index 000000000..c5132b62c
--- /dev/null
+++ b/pkg/mongodb/MongodbProducer.php
@@ -0,0 +1,181 @@
+context = $context;
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @param MongodbDestination $destination
+ * @param MongodbMessage $message
+ *
+ * @throws Exception
+ */
+ public function send(PsrDestination $destination, PsrMessage $message)
+ {
+ InvalidDestinationException::assertDestinationInstanceOf($destination, MongodbDestination::class);
+ InvalidMessageException::assertMessageInstanceOf($message, MongodbMessage::class);
+
+ if (null !== $this->priority && null === $message->getPriority()) {
+ $message->setPriority($this->priority);
+ }
+ if (null !== $this->deliveryDelay && null === $message->getDeliveryDelay()) {
+ $message->setDeliveryDelay($this->deliveryDelay);
+ }
+ if (null !== $this->timeToLive && null === $message->getTimeToLive()) {
+ $message->setTimeToLive($this->timeToLive);
+ }
+
+ $body = $message->getBody();
+ if (is_scalar($body) || null === $body) {
+ $body = (string) $body;
+ } else {
+ throw new InvalidMessageException(sprintf(
+ 'The message body must be a scalar or null. Got: %s',
+ is_object($body) ? get_class($body) : gettype($body)
+ ));
+ }
+
+ $publishedAt = null !== $message->getPublishedAt() ?
+ $message->getPublishedAt() :
+ (int) (microtime(true) * 10000)
+ ;
+
+ $mongoMessage = [
+ 'published_at' => $publishedAt,
+ 'body' => $body,
+ 'headers' => JSON::encode($message->getHeaders()),
+ 'properties' => JSON::encode($message->getProperties()),
+ 'priority' => $message->getPriority(),
+ 'queue' => $destination->getName(),
+ 'redelivered' => $message->isRedelivered(),
+ ];
+
+ $delay = $message->getDeliveryDelay();
+ if ($delay) {
+ if (!is_int($delay)) {
+ throw new \LogicException(sprintf(
+ 'Delay must be integer but got: "%s"',
+ is_object($delay) ? get_class($delay) : gettype($delay)
+ ));
+ }
+
+ if ($delay <= 0) {
+ throw new \LogicException(sprintf('Delay must be positive integer but got: "%s"', $delay));
+ }
+
+ $mongoMessage['delayed_until'] = time() + (int) $delay / 1000;
+ }
+
+ $timeToLive = $message->getTimeToLive();
+ if ($timeToLive) {
+ if (!is_int($timeToLive)) {
+ throw new \LogicException(sprintf(
+ 'TimeToLive must be integer but got: "%s"',
+ is_object($timeToLive) ? get_class($timeToLive) : gettype($timeToLive)
+ ));
+ }
+
+ if ($timeToLive <= 0) {
+ throw new \LogicException(sprintf('TimeToLive must be positive integer but got: "%s"', $timeToLive));
+ }
+
+ $mongoMessage['time_to_live'] = time() + (int) $timeToLive / 1000;
+ }
+
+ try {
+ $collection = $this->context->getCollection();
+ $collection->insertOne($mongoMessage);
+ } catch (\Exception $e) {
+ throw new Exception('The transport has failed to send the message due to some internal error.', null, $e);
+ }
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function setDeliveryDelay($deliveryDelay)
+ {
+ $this->deliveryDelay = $deliveryDelay;
+
+ return $this;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getDeliveryDelay()
+ {
+ return $this->deliveryDelay;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function setPriority($priority)
+ {
+ $this->priority = $priority;
+
+ return $this;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getPriority()
+ {
+ return $this->priority;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function setTimeToLive($timeToLive)
+ {
+ $this->timeToLive = $timeToLive;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getTimeToLive()
+ {
+ return $this->timeToLive;
+ }
+}
diff --git a/pkg/mongodb/README.md b/pkg/mongodb/README.md
new file mode 100644
index 000000000..d29cde0f2
--- /dev/null
+++ b/pkg/mongodb/README.md
@@ -0,0 +1,27 @@
+# Mongodb Transport
+
+[![Gitter](https://badges.gitter.im/php-enqueue/Lobby.svg)](https://gitter.im/php-enqueue/Lobby)
+[![Build Status](https://travis-ci.org/php-enqueue/mongodb.png?branch=master)](https://travis-ci.org/php-enqueue/mongodb)
+[![Total Downloads](https://poser.pugx.org/enqueue/mongodb/d/total.png)](https://packagist.org/packages/enqueue/mongodb)
+[![Latest Stable Version](https://poser.pugx.org/enqueue/mongodb/version.png)](https://packagist.org/packages/enqueue/mongodb)
+
+This is an implementation of the queue specification. It allows you to use MongoDB database as a message broker.
+
+## Resources
+
+* [Site](https://enqueue.forma-pro.com/)
+* [Documentation](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/index.md)
+* [Questions](https://gitter.im/php-enqueue/Lobby)
+* [Issue Tracker](https://github.com/php-enqueue/enqueue-dev/issues)
+
+## Developed by Forma-Pro
+
+Forma-Pro is a full stack development company which interests also spread to open source development.
+Being a team of strong professionals we have an aim an ability to help community by developing cutting edge solutions in the areas of e-commerce, docker & microservice oriented architecture where we have accumulated a huge many-years experience.
+Our main specialization is Symfony framework based solution, but we are always looking to the technologies that allow us to do our job the best way. We are committed to creating solutions that revolutionize the way how things are developed in aspects of architecture & scalability.
+
+If you have any questions and inquires about our open source development, this product particularly or any other matter feel free to contact at opensource@forma-pro.com
+
+## License
+
+It is released under the [MIT License](LICENSE).
\ No newline at end of file
diff --git a/pkg/mongodb/Symfony/MongodbTransportFactory.php b/pkg/mongodb/Symfony/MongodbTransportFactory.php
new file mode 100644
index 000000000..3486d8a8e
--- /dev/null
+++ b/pkg/mongodb/Symfony/MongodbTransportFactory.php
@@ -0,0 +1,119 @@
+name = $name;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function addConfiguration(ArrayNodeDefinition $builder)
+ {
+ $builder
+ ->beforeNormalization()
+ ->ifString()
+ ->then(function ($v) {
+ return ['dsn' => $v];
+ })
+ ->end()
+ ->children()
+ ->scalarNode('dsn')
+ ->info('The Mongodb DSN. Other parameters are ignored if set')
+ ->isRequired()
+ ->end()
+ ->scalarNode('dbname')
+ ->defaultValue('enqueue')
+ ->info('Database name.')
+ ->end()
+ ->scalarNode('collection_name')
+ ->defaultValue('enqueue')
+ ->info('Collection')
+ ->end()
+ ->integerNode('polling_interval')
+ ->defaultValue(1000)
+ ->min(100)
+ ->info('How often query for new messages.')
+ ->end()
+ ;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function createConnectionFactory(ContainerBuilder $container, array $config)
+ {
+ $factory = new Definition(MongodbConnectionFactory::class, [$config]);
+
+ $factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName());
+ $container->setDefinition($factoryId, $factory);
+
+ return $factoryId;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function createContext(ContainerBuilder $container, array $config)
+ {
+ $factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName());
+
+ $context = new Definition(MongodbContext::class);
+ $context->setPublic(true);
+ $context->setFactory([new Reference($factoryId), 'createContext']);
+
+ $contextId = sprintf('enqueue.transport.%s.context', $this->getName());
+ $container->setDefinition($contextId, $context);
+
+ return $contextId;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function createDriver(ContainerBuilder $container, array $config)
+ {
+ $driver = new Definition(MongodbDriver::class);
+ $driver->setPublic(true);
+ $driver->setArguments([
+ new Reference(sprintf('enqueue.transport.%s.context', $this->getName())),
+ new Reference('enqueue.client.config'),
+ new Reference('enqueue.client.meta.queue_meta_registry'),
+ ]);
+
+ $driverId = sprintf('enqueue.client.%s.driver', $this->getName());
+ $container->setDefinition($driverId, $driver);
+
+ return $driverId;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getName()
+ {
+ return $this->name;
+ }
+}
diff --git a/pkg/mongodb/Tests/Client/MongodbDriverTest.php b/pkg/mongodb/Tests/Client/MongodbDriverTest.php
new file mode 100644
index 000000000..1d4bb60ad
--- /dev/null
+++ b/pkg/mongodb/Tests/Client/MongodbDriverTest.php
@@ -0,0 +1,351 @@
+assertClassImplements(DriverInterface::class, MongodbDriver::class);
+ }
+
+ public function testCouldBeConstructedWithRequiredArguments()
+ {
+ new MongodbDriver(
+ $this->createPsrContextMock(),
+ $this->createDummyConfig(),
+ $this->createDummyQueueMetaRegistry()
+ );
+ }
+
+ public function testShouldReturnConfigObject()
+ {
+ $config = $this->createDummyConfig();
+
+ $driver = new MongodbDriver(
+ $this->createPsrContextMock(),
+ $config,
+ $this->createDummyQueueMetaRegistry()
+ );
+
+ $this->assertSame($config, $driver->getConfig());
+ }
+
+ public function testShouldCreateAndReturnQueueInstance()
+ {
+ $expectedQueue = new MongodbDestination('aName');
+
+ $context = $this->createPsrContextMock();
+ $context
+ ->expects($this->once())
+ ->method('createQueue')
+ ->with('aprefix.afooqueue')
+ ->willReturn($expectedQueue)
+ ;
+
+ $driver = new MongodbDriver($context, $this->createDummyConfig(), $this->createDummyQueueMetaRegistry());
+
+ $queue = $driver->createQueue('aFooQueue');
+
+ $this->assertSame($expectedQueue, $queue);
+ }
+
+ public function testShouldCreateAndReturnQueueInstanceWithHardcodedTransportName()
+ {
+ $expectedQueue = new MongodbDestination('aName');
+
+ $context = $this->createPsrContextMock();
+ $context
+ ->expects($this->once())
+ ->method('createQueue')
+ ->with('aBarQueue')
+ ->willReturn($expectedQueue)
+ ;
+
+ $driver = new MongodbDriver($context, $this->createDummyConfig(), $this->createDummyQueueMetaRegistry());
+
+ $queue = $driver->createQueue('aBarQueue');
+
+ $this->assertSame($expectedQueue, $queue);
+ }
+
+ public function testShouldConvertTransportMessageToClientMessage()
+ {
+ $transportMessage = new MongodbMessage();
+ $transportMessage->setBody('body');
+ $transportMessage->setHeaders(['hkey' => 'hval']);
+ $transportMessage->setProperties(['key' => 'val']);
+ $transportMessage->setHeader('content_type', 'ContentType');
+ $transportMessage->setMessageId('MessageId');
+ $transportMessage->setTimestamp(1000);
+ $transportMessage->setPriority(2);
+ $transportMessage->setDeliveryDelay(12345);
+
+ $driver = new MongodbDriver(
+ $this->createPsrContextMock(),
+ $this->createDummyConfig(),
+ $this->createDummyQueueMetaRegistry()
+ );
+
+ $clientMessage = $driver->createClientMessage($transportMessage);
+
+ $this->assertInstanceOf(Message::class, $clientMessage);
+ $this->assertSame('body', $clientMessage->getBody());
+ $this->assertSame([
+ 'hkey' => 'hval',
+ 'content_type' => 'ContentType',
+ 'message_id' => 'MessageId',
+ 'timestamp' => 1000,
+ ], $clientMessage->getHeaders());
+ $this->assertSame([
+ 'key' => 'val',
+ ], $clientMessage->getProperties());
+ $this->assertSame('MessageId', $clientMessage->getMessageId());
+ $this->assertSame('ContentType', $clientMessage->getContentType());
+ $this->assertSame(1000, $clientMessage->getTimestamp());
+ $this->assertSame(12345, $clientMessage->getDelay());
+
+ $this->assertNull($clientMessage->getExpire());
+ $this->assertSame(MessagePriority::NORMAL, $clientMessage->getPriority());
+ }
+
+ public function testShouldConvertClientMessageToTransportMessage()
+ {
+ $clientMessage = new Message();
+ $clientMessage->setBody('body');
+ $clientMessage->setHeaders(['hkey' => 'hval']);
+ $clientMessage->setProperties(['key' => 'val']);
+ $clientMessage->setContentType('ContentType');
+ $clientMessage->setExpire(123);
+ $clientMessage->setPriority(MessagePriority::VERY_HIGH);
+ $clientMessage->setMessageId('MessageId');
+ $clientMessage->setTimestamp(1000);
+
+ $context = $this->createPsrContextMock();
+ $context
+ ->expects($this->once())
+ ->method('createMessage')
+ ->willReturn(new MongodbMessage())
+ ;
+
+ $driver = new MongodbDriver(
+ $context,
+ $this->createDummyConfig(),
+ $this->createDummyQueueMetaRegistry()
+ );
+
+ $transportMessage = $driver->createTransportMessage($clientMessage);
+
+ $this->assertInstanceOf(MongodbMessage::class, $transportMessage);
+ $this->assertSame('body', $transportMessage->getBody());
+ $this->assertSame([
+ 'hkey' => 'hval',
+ 'content_type' => 'ContentType',
+ 'message_id' => 'MessageId',
+ 'timestamp' => 1000,
+ 'reply_to' => null,
+ 'correlation_id' => null,
+ ], $transportMessage->getHeaders());
+ $this->assertSame([
+ 'key' => 'val',
+ ], $transportMessage->getProperties());
+ $this->assertSame('MessageId', $transportMessage->getMessageId());
+ $this->assertSame(1000, $transportMessage->getTimestamp());
+ }
+
+ public function testShouldSendMessageToRouter()
+ {
+ $topic = new MongodbDestination('queue-name');
+ $transportMessage = new MongodbMessage();
+
+ $producer = $this->createPsrProducerMock();
+ $producer
+ ->expects($this->once())
+ ->method('send')
+ ->with($this->identicalTo($topic), $this->identicalTo($transportMessage))
+ ;
+ $context = $this->createPsrContextMock();
+ $context
+ ->expects($this->once())
+ ->method('createQueue')
+ ->with('aprefix.default')
+ ->willReturn($topic)
+ ;
+ $context
+ ->expects($this->once())
+ ->method('createProducer')
+ ->willReturn($producer)
+ ;
+ $context
+ ->expects($this->once())
+ ->method('createMessage')
+ ->willReturn($transportMessage)
+ ;
+
+ $driver = new MongodbDriver(
+ $context,
+ $this->createDummyConfig(),
+ $this->createDummyQueueMetaRegistry()
+ );
+
+ $message = new Message();
+ $message->setProperty(Config::PARAMETER_TOPIC_NAME, 'topic');
+
+ $driver->sendToRouter($message);
+ }
+
+ public function testShouldThrowExceptionIfTopicParameterIsNotSet()
+ {
+ $driver = new MongodbDriver(
+ $this->createPsrContextMock(),
+ $this->createDummyConfig(),
+ $this->createDummyQueueMetaRegistry()
+ );
+
+ $this->expectException(\LogicException::class);
+ $this->expectExceptionMessage('Topic name parameter is required but is not set');
+
+ $driver->sendToRouter(new Message());
+ }
+
+ public function testShouldSendMessageToProcessor()
+ {
+ $queue = new MongodbDestination('queue-name');
+ $transportMessage = new MongodbMessage();
+
+ $producer = $this->createPsrProducerMock();
+ $producer
+ ->expects($this->once())
+ ->method('send')
+ ->with($this->identicalTo($queue), $this->identicalTo($transportMessage))
+ ;
+ $context = $this->createPsrContextMock();
+ $context
+ ->expects($this->once())
+ ->method('createQueue')
+ ->willReturn($queue)
+ ;
+ $context
+ ->expects($this->once())
+ ->method('createProducer')
+ ->willReturn($producer)
+ ;
+ $context
+ ->expects($this->once())
+ ->method('createMessage')
+ ->willReturn($transportMessage)
+ ;
+
+ $driver = new MongodbDriver(
+ $context,
+ $this->createDummyConfig(),
+ $this->createDummyQueueMetaRegistry()
+ );
+
+ $message = new Message();
+ $message->setProperty(Config::PARAMETER_PROCESSOR_NAME, 'processor');
+ $message->setProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME, 'aFooQueue');
+
+ $driver->sendToProcessor($message);
+ }
+
+ public function testShouldThrowExceptionIfProcessorNameParameterIsNotSet()
+ {
+ $driver = new MongodbDriver(
+ $this->createPsrContextMock(),
+ $this->createDummyConfig(),
+ $this->createDummyQueueMetaRegistry()
+ );
+
+ $this->expectException(\LogicException::class);
+ $this->expectExceptionMessage('Processor name parameter is required but is not set');
+
+ $driver->sendToProcessor(new Message());
+ }
+
+ public function testShouldThrowExceptionIfProcessorQueueNameParameterIsNotSet()
+ {
+ $driver = new MongodbDriver(
+ $this->createPsrContextMock(),
+ $this->createDummyConfig(),
+ $this->createDummyQueueMetaRegistry()
+ );
+
+ $this->expectException(\LogicException::class);
+ $this->expectExceptionMessage('Queue name parameter is required but is not set');
+
+ $message = new Message();
+ $message->setProperty(Config::PARAMETER_PROCESSOR_NAME, 'processor');
+
+ $driver->sendToProcessor($message);
+ }
+
+ public function testShouldSetupBroker()
+ {
+ $context = $this->createPsrContextMock();
+
+ $context
+ ->expects($this->once())
+ ->method('createCollection')
+ ;
+
+ $driver = new MongodbDriver(
+ $context,
+ $this->createDummyConfig(),
+ $this->createDummyQueueMetaRegistry()
+ );
+
+ $driver->setupBroker();
+ }
+
+ /**
+ * @return \PHPUnit_Framework_MockObject_MockObject|MongodbContext
+ */
+ private function createPsrContextMock()
+ {
+ return $this->createMock(MongodbContext::class);
+ }
+
+ /**
+ * @return \PHPUnit_Framework_MockObject_MockObject|PsrProducer
+ */
+ private function createPsrProducerMock()
+ {
+ return $this->createMock(PsrProducer::class);
+ }
+
+ /**
+ * @return QueueMetaRegistry
+ */
+ private function createDummyQueueMetaRegistry()
+ {
+ $registry = new QueueMetaRegistry($this->createDummyConfig(), []);
+ $registry->add('default');
+ $registry->add('aFooQueue');
+ $registry->add('aBarQueue', 'aBarQueue');
+
+ return $registry;
+ }
+
+ /**
+ * @return Config
+ */
+ private function createDummyConfig()
+ {
+ return Config::create('aPrefix');
+ }
+}
diff --git a/pkg/mongodb/Tests/Functional/MongodbConsumerTest.php b/pkg/mongodb/Tests/Functional/MongodbConsumerTest.php
new file mode 100644
index 000000000..609a22e16
--- /dev/null
+++ b/pkg/mongodb/Tests/Functional/MongodbConsumerTest.php
@@ -0,0 +1,104 @@
+context = $this->buildMongodbContext();
+ }
+
+ protected function tearDown()
+ {
+ if ($this->context) {
+ $this->context->close();
+ }
+
+ parent::tearDown();
+ }
+
+ public function testShouldSetPublishedAtDateToReceivedMessage()
+ {
+ $context = $this->context;
+ $queue = $context->createQueue(__METHOD__);
+
+ $consumer = $context->createConsumer($queue);
+
+ // guard
+ $this->assertNull($consumer->receiveNoWait());
+
+ $time = (int) (microtime(true) * 10000);
+
+ $expectedBody = __CLASS__.$time;
+
+ $producer = $context->createProducer();
+
+ $message = $context->createMessage($expectedBody);
+ $message->setPublishedAt($time);
+ $producer->send($queue, $message);
+
+ $message = $consumer->receive(8000); // 8 sec
+
+ $this->assertInstanceOf(MongodbMessage::class, $message);
+ $consumer->acknowledge($message);
+ $this->assertSame($expectedBody, $message->getBody());
+ $this->assertSame($time, $message->getPublishedAt());
+ }
+
+ public function testShouldOrderMessagesWithSamePriorityByPublishedAtDate()
+ {
+ $context = $this->context;
+ $queue = $context->createQueue(__METHOD__);
+
+ $consumer = $context->createConsumer($queue);
+
+ // guard
+ $this->assertNull($consumer->receiveNoWait());
+
+ $time = (int) (microtime(true) * 10000);
+ $olderTime = $time - 10000;
+
+ $expectedPriority5Body = __CLASS__.'_priority5_'.$time;
+ $expectedPriority5BodyOlderTime = __CLASS__.'_priority5Old_'.$olderTime;
+
+ $producer = $context->createProducer();
+
+ $message = $context->createMessage($expectedPriority5Body);
+ $message->setPriority(5);
+ $message->setPublishedAt($time);
+ $producer->send($queue, $message);
+
+ $message = $context->createMessage($expectedPriority5BodyOlderTime);
+ $message->setPriority(5);
+ $message->setPublishedAt($olderTime);
+ $producer->send($queue, $message);
+
+ $message = $consumer->receive(8000); // 8 sec
+
+ $this->assertInstanceOf(MongodbMessage::class, $message);
+ $consumer->acknowledge($message);
+ $this->assertSame($expectedPriority5BodyOlderTime, $message->getBody());
+
+ $message = $consumer->receive(8000); // 8 sec
+
+ $this->assertInstanceOf(MongodbMessage::class, $message);
+ $consumer->acknowledge($message);
+ $this->assertSame($expectedPriority5Body, $message->getBody());
+ }
+}
diff --git a/pkg/mongodb/Tests/MongodbConnectionFactoryTest.php b/pkg/mongodb/Tests/MongodbConnectionFactoryTest.php
new file mode 100644
index 000000000..daaef5102
--- /dev/null
+++ b/pkg/mongodb/Tests/MongodbConnectionFactoryTest.php
@@ -0,0 +1,57 @@
+assertClassImplements(PsrConnectionFactory::class, MongodbConnectionFactory::class);
+ }
+
+ public function testCouldBeConstructedWithEmptyConfiguration()
+ {
+ $params = [
+ 'dsn' => 'mongodb://127.0.0.1/',
+ 'dbname' => 'enqueue',
+ 'collection_name' => 'enqueue',
+ ];
+
+ $factory = new MongodbConnectionFactory();
+ $this->assertAttributeEquals($params, 'config', $factory);
+ }
+
+ public function testCouldBeConstructedWithCustomConfiguration()
+ {
+ $params = [
+ 'dsn' => 'mongodb://127.0.0.3/',
+ 'uriOptions' => ['testValue' => 123],
+ 'driverOptions' => ['testValue' => 123],
+ 'dbname' => 'enqueue',
+ 'collection_name' => 'enqueue',
+ ];
+
+ $factory = new MongodbConnectionFactory($params);
+
+ $this->assertAttributeEquals($params, 'config', $factory);
+ }
+
+ public function testShouldCreateContext()
+ {
+ $factory = new MongodbConnectionFactory();
+
+ $context = $factory->createContext();
+
+ $this->assertInstanceOf(MongodbContext::class, $context);
+ }
+}
diff --git a/pkg/mongodb/Tests/MongodbConsumerTest.php b/pkg/mongodb/Tests/MongodbConsumerTest.php
new file mode 100644
index 000000000..0c2e5c664
--- /dev/null
+++ b/pkg/mongodb/Tests/MongodbConsumerTest.php
@@ -0,0 +1,212 @@
+assertClassImplements(PsrConsumer::class, MongodbConsumer::class);
+ }
+
+ public function testCouldBeConstructedWithRequiredArguments()
+ {
+ new MongodbConsumer($this->createContextMock(), new MongodbDestination('queue'));
+ }
+
+ public function testShouldReturnInstanceOfDestination()
+ {
+ $destination = new MongodbDestination('queue');
+
+ $consumer = new MongodbConsumer($this->createContextMock(), $destination);
+
+ $this->assertSame($destination, $consumer->getQueue());
+ }
+
+ public function testCouldCallAcknowledgedMethod()
+ {
+ $consumer = new MongodbConsumer($this->createContextMock(), new MongodbDestination('queue'));
+ $consumer->acknowledge(new MongodbMessage());
+ }
+
+ public function testCouldSetAndGetPollingInterval()
+ {
+ $destination = new MongodbDestination('queue');
+
+ $consumer = new MongodbConsumer($this->createContextMock(), $destination);
+ $consumer->setPollingInterval(123456);
+
+ $this->assertEquals(123456, $consumer->getPollingInterval());
+ }
+
+ public function testRejectShouldThrowIfInstanceOfMessageIsInvalid()
+ {
+ $this->expectException(InvalidMessageException::class);
+ $this->expectExceptionMessage(
+ 'The message must be an instance of '.
+ 'Enqueue\Mongodb\MongodbMessage '.
+ 'but it is Enqueue\Mongodb\Tests\InvalidMessage.'
+ );
+
+ $consumer = new MongodbConsumer($this->createContextMock(), new MongodbDestination('queue'));
+ $consumer->reject(new InvalidMessage());
+ }
+
+ public function testShouldDoNothingOnReject()
+ {
+ $queue = new MongodbDestination('queue');
+
+ $message = new MongodbMessage();
+ $message->setBody('theBody');
+
+ $context = $this->createContextMock();
+ $context
+ ->expects($this->never())
+ ->method('createProducer')
+ ;
+
+ $consumer = new MongodbConsumer($context, $queue);
+
+ $consumer->reject($message);
+ }
+
+ public function testRejectShouldReSendMessageToSameQueueOnRequeue()
+ {
+ $queue = new MongodbDestination('queue');
+
+ $message = new MongodbMessage();
+ $message->setBody('theBody');
+
+ $producerMock = $this->createProducerMock();
+ $producerMock
+ ->expects($this->once())
+ ->method('send')
+ ->with($this->identicalTo($queue), $this->identicalTo($message))
+ ;
+
+ $context = $this->createContextMock();
+ $context
+ ->expects($this->once())
+ ->method('createProducer')
+ ->will($this->returnValue($producerMock))
+ ;
+
+ $consumer = new MongodbConsumer($context, $queue);
+
+ $consumer->reject($message, true);
+ }
+
+ /**
+ * @return MongodbProducer|\PHPUnit_Framework_MockObject_MockObject
+ */
+ private function createProducerMock()
+ {
+ return $this->createMock(MongodbProducer::class);
+ }
+
+ /**
+ * @return \PHPUnit_Framework_MockObject_MockObject|MongodbContext
+ */
+ private function createContextMock()
+ {
+ return $this->createMock(MongodbContext::class);
+ }
+}
+
+class InvalidMessage implements PsrMessage
+{
+ public function getBody()
+ {
+ }
+
+ public function setBody($body)
+ {
+ }
+
+ public function setProperties(array $properties)
+ {
+ }
+
+ public function getProperties()
+ {
+ }
+
+ public function setProperty($name, $value)
+ {
+ }
+
+ public function getProperty($name, $default = null)
+ {
+ }
+
+ public function setHeaders(array $headers)
+ {
+ }
+
+ public function getHeaders()
+ {
+ }
+
+ public function setHeader($name, $value)
+ {
+ }
+
+ public function getHeader($name, $default = null)
+ {
+ }
+
+ public function setRedelivered($redelivered)
+ {
+ }
+
+ public function isRedelivered()
+ {
+ }
+
+ public function setCorrelationId($correlationId)
+ {
+ }
+
+ public function getCorrelationId()
+ {
+ }
+
+ public function setMessageId($messageId)
+ {
+ }
+
+ public function getMessageId()
+ {
+ }
+
+ public function getTimestamp()
+ {
+ }
+
+ public function setTimestamp($timestamp)
+ {
+ }
+
+ public function setReplyTo($replyTo)
+ {
+ }
+
+ public function getReplyTo()
+ {
+ }
+}
diff --git a/pkg/mongodb/Tests/MongodbContextTest.php b/pkg/mongodb/Tests/MongodbContextTest.php
new file mode 100644
index 000000000..0c06397d9
--- /dev/null
+++ b/pkg/mongodb/Tests/MongodbContextTest.php
@@ -0,0 +1,169 @@
+assertClassImplements(PsrContext::class, MongodbContext::class);
+ }
+
+ public function testCouldBeConstructedWithRequiredArguments()
+ {
+ new MongodbContext($this->createClientMock());
+ }
+
+ public function testCouldBeConstructedWithEmptyConfiguration()
+ {
+ $context = new MongodbContext($this->createClientMock(), []);
+
+ $this->assertAttributeEquals([
+ 'dbname' => 'enqueue',
+ 'collection_name' => 'enqueue',
+ 'polling_interval' => null,
+ ], 'config', $context);
+ }
+
+ public function testCouldBeConstructedWithCustomConfiguration()
+ {
+ $client = new MongodbContext($this->createClientMock(), [
+ 'dbname' => 'testDbName',
+ 'collection_name' => 'testCollectionName',
+ 'polling_interval' => 123456,
+ ]);
+
+ $this->assertAttributeEquals([
+ 'dbname' => 'testDbName',
+ 'collection_name' => 'testCollectionName',
+ 'polling_interval' => 123456,
+ ], 'config', $client);
+ }
+
+ public function testShouldCreateMessage()
+ {
+ $context = new MongodbContext($this->createClientMock());
+ $message = $context->createMessage('body', ['pkey' => 'pval'], ['hkey' => 'hval']);
+
+ $this->assertInstanceOf(MongodbMessage::class, $message);
+ $this->assertEquals('body', $message->getBody());
+ $this->assertEquals(['pkey' => 'pval'], $message->getProperties());
+ $this->assertEquals(['hkey' => 'hval'], $message->getHeaders());
+ $this->assertNull($message->getPriority());
+ $this->assertFalse($message->isRedelivered());
+ }
+
+ public function testShouldCreateTopic()
+ {
+ $context = new MongodbContext($this->createClientMock());
+ $topic = $context->createTopic('topic');
+
+ $this->assertInstanceOf(MongodbDestination::class, $topic);
+ $this->assertEquals('topic', $topic->getTopicName());
+ }
+
+ public function testShouldCreateQueue()
+ {
+ $context = new MongodbContext($this->createClientMock());
+ $queue = $context->createQueue('queue');
+
+ $this->assertInstanceOf(MongodbDestination::class, $queue);
+ $this->assertEquals('queue', $queue->getName());
+ }
+
+ public function testShouldCreateProducer()
+ {
+ $context = new MongodbContext($this->createClientMock());
+
+ $this->assertInstanceOf(MongodbProducer::class, $context->createProducer());
+ }
+
+ public function testShouldCreateConsumer()
+ {
+ $context = new MongodbContext($this->createClientMock());
+
+ $this->assertInstanceOf(MongodbConsumer::class, $context->createConsumer(new MongodbDestination('')));
+ }
+
+ public function testShouldCreateMessageConsumerAndSetPollingInterval()
+ {
+ $context = new MongodbContext($this->createClientMock(), [
+ 'polling_interval' => 123456,
+ ]);
+
+ $consumer = $context->createConsumer(new MongodbDestination(''));
+
+ $this->assertInstanceOf(MongodbConsumer::class, $consumer);
+ $this->assertEquals(123456, $consumer->getPollingInterval());
+ }
+
+ public function testShouldThrowIfDestinationIsInvalidInstanceType()
+ {
+ $this->expectException(InvalidDestinationException::class);
+ $this->expectExceptionMessage(
+ 'The destination must be an instance of '.
+ 'Enqueue\Mongodb\MongodbDestination but got '.
+ 'Enqueue\Mongodb\Tests\NotSupportedDestination2.'
+ );
+
+ $context = new MongodbContext($this->createClientMock());
+
+ $this->assertInstanceOf(MongodbConsumer::class, $context->createConsumer(new NotSupportedDestination2()));
+ }
+
+ public function testShouldReturnInstanceOfClient()
+ {
+ $context = new MongodbContext($client = $this->createClientMock());
+
+ $this->assertSame($client, $context->getClient());
+ }
+
+ public function testShouldReturnConfig()
+ {
+ $context = new MongodbContext($this->createClientMock());
+
+ $this->assertSame([
+ 'dbname' => 'enqueue',
+ 'collection_name' => 'enqueue',
+ 'polling_interval' => null,
+ ], $context->getConfig());
+ }
+
+ public function testShouldThrowBadMethodCallExceptionOncreateTemporaryQueueCall()
+ {
+ $context = new MongodbContext($this->createClientMock());
+
+ $this->expectException(\BadMethodCallException::class);
+ $this->expectExceptionMessage('Mongodb transport does not support temporary queues');
+
+ $context->createTemporaryQueue();
+ }
+
+ /**
+ * @return \PHPUnit_Framework_MockObject_MockObject|Client
+ */
+ private function createClientMock()
+ {
+ return $this->createMock(Client::class);
+ }
+}
+
+class NotSupportedDestination2 implements PsrDestination
+{
+}
diff --git a/pkg/mongodb/Tests/MongodbDestinationTest.php b/pkg/mongodb/Tests/MongodbDestinationTest.php
new file mode 100644
index 000000000..ef81e8fc9
--- /dev/null
+++ b/pkg/mongodb/Tests/MongodbDestinationTest.php
@@ -0,0 +1,40 @@
+assertClassImplements(PsrDestination::class, MongodbDestination::class);
+ }
+
+ public function testShouldImplementTopicInterface()
+ {
+ $this->assertClassImplements(PsrTopic::class, MongodbDestination::class);
+ }
+
+ public function testShouldImplementQueueInterface()
+ {
+ $this->assertClassImplements(PsrQueue::class, MongodbDestination::class);
+ }
+
+ public function testShouldReturnTopicAndQueuePreviouslySetInConstructor()
+ {
+ $destination = new MongodbDestination('topic-or-queue-name');
+
+ $this->assertSame('topic-or-queue-name', $destination->getName());
+ $this->assertSame('topic-or-queue-name', $destination->getTopicName());
+ }
+}
diff --git a/pkg/mongodb/Tests/MongodbMessageTest.php b/pkg/mongodb/Tests/MongodbMessageTest.php
new file mode 100644
index 000000000..b9e5e7501
--- /dev/null
+++ b/pkg/mongodb/Tests/MongodbMessageTest.php
@@ -0,0 +1,94 @@
+assertSame('', $message->getBody());
+ $this->assertSame([], $message->getProperties());
+ $this->assertSame([], $message->getHeaders());
+ }
+
+ public function testCouldBeConstructedWithOptionalArguments()
+ {
+ $message = new MongodbMessage('theBody', ['barProp' => 'barPropVal'], ['fooHeader' => 'fooHeaderVal']);
+
+ $this->assertSame('theBody', $message->getBody());
+ $this->assertSame(['barProp' => 'barPropVal'], $message->getProperties());
+ $this->assertSame(['fooHeader' => 'fooHeaderVal'], $message->getHeaders());
+ }
+
+ public function testShouldSetNullPriorityInConstructor()
+ {
+ $message = new MongodbMessage();
+
+ $this->assertNull($message->getPriority());
+ }
+
+ public function testShouldSetDelayToNullInConstructor()
+ {
+ $message = new MongodbMessage();
+
+ $this->assertNull($message->getDeliveryDelay());
+ }
+
+ public function testShouldSetCorrelationIdAsHeader()
+ {
+ $message = new MongodbMessage();
+ $message->setCorrelationId('theCorrelationId');
+
+ $this->assertSame(['correlation_id' => 'theCorrelationId'], $message->getHeaders());
+ }
+
+ public function testShouldSetPublishedAtToNullInConstructor()
+ {
+ $message = new MongodbMessage();
+
+ $this->assertNull($message->getPublishedAt());
+ }
+
+ public function testShouldSetMessageIdAsHeader()
+ {
+ $message = new MongodbMessage();
+ $message->setMessageId('theMessageId');
+
+ $this->assertSame(['message_id' => 'theMessageId'], $message->getHeaders());
+ }
+
+ public function testShouldSetTimestampAsHeader()
+ {
+ $message = new MongodbMessage();
+ $message->setTimestamp(12345);
+
+ $this->assertSame(['timestamp' => 12345], $message->getHeaders());
+ }
+
+ public function testShouldSetReplyToAsHeader()
+ {
+ $message = new MongodbMessage();
+ $message->setReplyTo('theReply');
+
+ $this->assertSame(['reply_to' => 'theReply'], $message->getHeaders());
+ }
+
+ public function testShouldAllowGetPreviouslySetPublishedAtTime()
+ {
+ $message = new MongodbMessage();
+
+ $message->setPublishedAt(123);
+
+ $this->assertSame(123, $message->getPublishedAt());
+ }
+}
diff --git a/pkg/mongodb/Tests/MongodbProducerTest.php b/pkg/mongodb/Tests/MongodbProducerTest.php
new file mode 100644
index 000000000..ca59d5520
--- /dev/null
+++ b/pkg/mongodb/Tests/MongodbProducerTest.php
@@ -0,0 +1,69 @@
+assertClassImplements(PsrProducer::class, MongodbProducer::class);
+ }
+
+ public function testCouldBeConstructedWithRequiredArguments()
+ {
+ new MongodbProducer($this->createContextMock());
+ }
+
+ public function testShouldThrowIfBodyOfInvalidType()
+ {
+ $this->expectException(InvalidMessageException::class);
+ $this->expectExceptionMessage('The message body must be a scalar or null. Got: stdClass');
+
+ $producer = new MongodbProducer($this->createContextMock());
+
+ $message = new MongodbMessage(new \stdClass());
+
+ $producer->send(new MongodbDestination(''), $message);
+ }
+
+ public function testShouldThrowIfDestinationOfInvalidType()
+ {
+ $this->expectException(InvalidDestinationException::class);
+ $this->expectExceptionMessage(
+ 'The destination must be an instance of '.
+ 'Enqueue\Mongodb\MongodbDestination but got '.
+ 'Enqueue\Mongodb\Tests\NotSupportedDestination1.'
+ );
+
+ $producer = new MongodbProducer($this->createContextMock());
+
+ $producer->send(new NotSupportedDestination1(), new MongodbMessage());
+ }
+
+ /**
+ * @return \PHPUnit_Framework_MockObject_MockObject|MongodbContext
+ */
+ private function createContextMock()
+ {
+ return $this->createMock(MongodbContext::class);
+ }
+}
+
+class NotSupportedDestination1 implements PsrDestination
+{
+}
diff --git a/pkg/mongodb/Tests/Spec/MongodbConnectionFactoryTest.php b/pkg/mongodb/Tests/Spec/MongodbConnectionFactoryTest.php
new file mode 100644
index 000000000..324fe52a6
--- /dev/null
+++ b/pkg/mongodb/Tests/Spec/MongodbConnectionFactoryTest.php
@@ -0,0 +1,20 @@
+buildMongodbContext();
+ }
+}
diff --git a/pkg/mongodb/Tests/Spec/MongodbMessageTest.php b/pkg/mongodb/Tests/Spec/MongodbMessageTest.php
new file mode 100644
index 000000000..51ab55ac5
--- /dev/null
+++ b/pkg/mongodb/Tests/Spec/MongodbMessageTest.php
@@ -0,0 +1,20 @@
+buildMongodbContext()->createProducer();
+ }
+}
diff --git a/pkg/mongodb/Tests/Spec/MongodbQueueTest.php b/pkg/mongodb/Tests/Spec/MongodbQueueTest.php
new file mode 100644
index 000000000..a555461a7
--- /dev/null
+++ b/pkg/mongodb/Tests/Spec/MongodbQueueTest.php
@@ -0,0 +1,20 @@
+buildMongodbContext();
+ }
+}
diff --git a/pkg/mongodb/Tests/Spec/MongodbSendAndReceiveDelayedMessageFromQueueTest.php b/pkg/mongodb/Tests/Spec/MongodbSendAndReceiveDelayedMessageFromQueueTest.php
new file mode 100644
index 000000000..a5eb3511d
--- /dev/null
+++ b/pkg/mongodb/Tests/Spec/MongodbSendAndReceiveDelayedMessageFromQueueTest.php
@@ -0,0 +1,23 @@
+buildMongodbContext();
+ }
+}
diff --git a/pkg/mongodb/Tests/Spec/MongodbSendAndReceivePriorityMessagesFromQueueTest.php b/pkg/mongodb/Tests/Spec/MongodbSendAndReceivePriorityMessagesFromQueueTest.php
new file mode 100644
index 000000000..5400820b8
--- /dev/null
+++ b/pkg/mongodb/Tests/Spec/MongodbSendAndReceivePriorityMessagesFromQueueTest.php
@@ -0,0 +1,53 @@
+publishedAt = (int) (microtime(true) * 10000);
+ }
+
+ /**
+ * @return PsrContext
+ */
+ protected function createContext()
+ {
+ return $this->buildMongodbContext();
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @param MongodbContext $context
+ *
+ * @return MongodbMessage
+ */
+ protected function createMessage(PsrContext $context, $body)
+ {
+ /** @var MongodbMessage $message */
+ $message = parent::createMessage($context, $body);
+
+ // in order to test priorities correctly we have to make sure the messages were sent in the same time.
+ $message->setPublishedAt($this->publishedAt);
+
+ return $message;
+ }
+}
diff --git a/pkg/mongodb/Tests/Spec/MongodbSendAndReceiveTimeToLiveMessagesFromQueueTest.php b/pkg/mongodb/Tests/Spec/MongodbSendAndReceiveTimeToLiveMessagesFromQueueTest.php
new file mode 100644
index 000000000..d87ac10e9
--- /dev/null
+++ b/pkg/mongodb/Tests/Spec/MongodbSendAndReceiveTimeToLiveMessagesFromQueueTest.php
@@ -0,0 +1,23 @@
+buildMongodbContext();
+ }
+}
diff --git a/pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveFromQueueTest.php b/pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveFromQueueTest.php
new file mode 100644
index 000000000..992c0626e
--- /dev/null
+++ b/pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveFromQueueTest.php
@@ -0,0 +1,23 @@
+buildMongodbContext();
+ }
+}
diff --git a/pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveFromTopicTest.php b/pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveFromTopicTest.php
new file mode 100644
index 000000000..c539386f7
--- /dev/null
+++ b/pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveFromTopicTest.php
@@ -0,0 +1,23 @@
+buildMongodbContext();
+ }
+}
diff --git a/pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveNoWaitFromQueueTest.php b/pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveNoWaitFromQueueTest.php
new file mode 100644
index 000000000..ea4febcc2
--- /dev/null
+++ b/pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveNoWaitFromQueueTest.php
@@ -0,0 +1,23 @@
+buildMongodbContext();
+ }
+}
diff --git a/pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveNoWaitFromTopicTest.php b/pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveNoWaitFromTopicTest.php
new file mode 100644
index 000000000..1e1be32c1
--- /dev/null
+++ b/pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveNoWaitFromTopicTest.php
@@ -0,0 +1,23 @@
+buildMongodbContext();
+ }
+}
diff --git a/pkg/mongodb/Tests/Spec/MongodbTopicTest.php b/pkg/mongodb/Tests/Spec/MongodbTopicTest.php
new file mode 100644
index 000000000..14a79f5a5
--- /dev/null
+++ b/pkg/mongodb/Tests/Spec/MongodbTopicTest.php
@@ -0,0 +1,20 @@
+assertClassImplements(TransportFactoryInterface::class, MongodbTransportFactory::class);
+ }
+
+ public function testCouldBeConstructedWithDefaultName()
+ {
+ $transport = new MongodbTransportFactory();
+
+ $this->assertEquals('mongodb', $transport->getName());
+ }
+
+ public function testCouldBeConstructedWithCustomName()
+ {
+ $transport = new MongodbTransportFactory('theCustomName');
+
+ $this->assertEquals('theCustomName', $transport->getName());
+ }
+
+ public function testShouldAllowAddConfiguration()
+ {
+ $transport = new MongodbTransportFactory();
+ $tb = new TreeBuilder();
+ $rootNode = $tb->root('foo');
+
+ $transport->addConfiguration($rootNode);
+ $processor = new Processor();
+ $config = $processor->process($tb->buildTree(), [[
+ 'dsn' => 'mongodb://127.0.0.1/',
+ ]]);
+
+ $this->assertEquals([
+ 'dsn' => 'mongodb://127.0.0.1/',
+ 'dbname' => 'enqueue',
+ 'collection_name' => 'enqueue',
+ 'polling_interval' => 1000,
+ ], $config);
+ }
+
+ public function testShouldAllowAddConfigurationAsString()
+ {
+ $transport = new MongodbTransportFactory();
+ $tb = new TreeBuilder();
+ $rootNode = $tb->root('foo');
+
+ $transport->addConfiguration($rootNode);
+ $processor = new Processor();
+ $config = $processor->process($tb->buildTree(), ['mysqlDSN']);
+
+ $this->assertEquals([
+ 'dsn' => 'mysqlDSN',
+ 'dbname' => 'enqueue',
+ 'collection_name' => 'enqueue',
+ 'polling_interval' => 1000,
+ ], $config);
+ }
+
+ public function testShouldCreateMongodbConnectionFactory()
+ {
+ $container = new ContainerBuilder();
+
+ $transport = new MongodbTransportFactory();
+
+ $serviceId = $transport->createConnectionFactory($container, [
+ 'dsn' => 'mysqlDSN',
+ 'dbname' => 'enqueue',
+ 'collection_name' => 'enqueue',
+ 'polling_interval' => 1000,
+ ]);
+
+ $this->assertTrue($container->hasDefinition($serviceId));
+ $factory = $container->getDefinition($serviceId);
+ $this->assertEquals(MongodbConnectionFactory::class, $factory->getClass());
+
+ $this->assertSame([
+ 'dsn' => 'mysqlDSN',
+ 'dbname' => 'enqueue',
+ 'collection_name' => 'enqueue',
+ 'polling_interval' => 1000,
+ ], $factory->getArgument(0));
+ }
+
+ public function testShouldCreateConnectionFactoryFromDsnString()
+ {
+ $container = new ContainerBuilder();
+
+ $transport = new MongodbTransportFactory();
+
+ $serviceId = $transport->createConnectionFactory($container, [
+ 'dsn' => 'theDSN',
+ 'connection' => [],
+ 'lazy' => true,
+ 'table_name' => 'enqueue',
+ 'polling_interval' => 1000,
+ ]);
+
+ $this->assertTrue($container->hasDefinition($serviceId));
+ $factory = $container->getDefinition($serviceId);
+ $this->assertEquals(MongodbConnectionFactory::class, $factory->getClass());
+ $this->assertSame('theDSN', $factory->getArgument(0)['dsn']);
+ }
+
+ public function testShouldCreateContext()
+ {
+ $container = new ContainerBuilder();
+
+ $transport = new MongodbTransportFactory();
+
+ $serviceId = $transport->createContext($container, []);
+
+ $this->assertEquals('enqueue.transport.mongodb.context', $serviceId);
+ $this->assertTrue($container->hasDefinition($serviceId));
+
+ $context = $container->getDefinition('enqueue.transport.mongodb.context');
+ $this->assertInstanceOf(Reference::class, $context->getFactory()[0]);
+ $this->assertEquals('enqueue.transport.mongodb.connection_factory', (string) $context->getFactory()[0]);
+ $this->assertEquals('createContext', $context->getFactory()[1]);
+ }
+
+ public function testShouldCreateDriver()
+ {
+ $container = new ContainerBuilder();
+
+ $transport = new MongodbTransportFactory();
+
+ $serviceId = $transport->createDriver($container, []);
+
+ $this->assertEquals('enqueue.client.mongodb.driver', $serviceId);
+ $this->assertTrue($container->hasDefinition($serviceId));
+
+ $driver = $container->getDefinition($serviceId);
+ $this->assertSame(MongodbDriver::class, $driver->getClass());
+
+ $this->assertInstanceOf(Reference::class, $driver->getArgument(0));
+ $this->assertEquals('enqueue.transport.mongodb.context', (string) $driver->getArgument(0));
+
+ $this->assertInstanceOf(Reference::class, $driver->getArgument(1));
+ $this->assertEquals('enqueue.client.config', (string) $driver->getArgument(1));
+
+ $this->assertInstanceOf(Reference::class, $driver->getArgument(2));
+ $this->assertEquals('enqueue.client.meta.queue_meta_registry', (string) $driver->getArgument(2));
+ }
+}
diff --git a/pkg/mongodb/composer.json b/pkg/mongodb/composer.json
new file mode 100644
index 000000000..59a2b6caa
--- /dev/null
+++ b/pkg/mongodb/composer.json
@@ -0,0 +1,49 @@
+{
+ "name": "enqueue/mongodb",
+ "type": "library",
+ "description": "Message Queue MongoDB Transport",
+ "keywords": [
+ "messaging",
+ "queue",
+ "mongodb"
+ ],
+ "homepage": "https://enqueue.forma-pro.com/",
+ "license": "MIT",
+ "require": {
+ "php": "^7.0",
+ "queue-interop/queue-interop": "^0.6@dev|^1.0.0-alpha1",
+ "mongodb/mongodb": "^1.2",
+ "ext-mongodb": "^1.3"
+ },
+ "require-dev": {
+ "phpunit/phpunit": "~5.4.0",
+ "queue-interop/queue-spec": "^0.5.5@dev",
+ "enqueue/test": "^0.8.25@dev",
+ "enqueue/enqueue": "^0.8@dev",
+ "enqueue/null": "^0.8@dev"
+ },
+ "support": {
+ "email": "opensource@forma-pro.com",
+ "issues": "https://github.com/php-enqueue/enqueue-dev/issues",
+ "forum": "https://gitter.im/php-enqueue/Lobby",
+ "source": "https://github.com/php-enqueue/enqueue-dev",
+ "docs": "https://github.com/php-enqueue/enqueue-dev/blob/master/docs/index.md"
+ },
+ "autoload": {
+ "psr-4": {
+ "Enqueue\\Mongodb\\": ""
+ },
+ "exclude-from-classmap": [
+ "/Tests/"
+ ]
+ },
+ "suggest": {
+ "enqueue/enqueue": "If you'd like to use advanced features like Client abstract layer or Symfony integration features"
+ },
+ "minimum-stability": "dev",
+ "extra": {
+ "branch-alias": {
+ "dev-master": "0.8.x-dev"
+ }
+ }
+}
diff --git a/pkg/mongodb/phpunit.xml.dist b/pkg/mongodb/phpunit.xml.dist
new file mode 100644
index 000000000..1f34af01d
--- /dev/null
+++ b/pkg/mongodb/phpunit.xml.dist
@@ -0,0 +1,30 @@
+
+
+
+
+
+
+ ./Tests
+
+
+
+
+
+ .
+
+ ./vendor
+ ./Tests
+
+
+
+
diff --git a/pkg/rdkafka/Tests/Client/RdKafkaDriverTest.php b/pkg/rdkafka/Tests/Client/RdKafkaDriverTest.php
index c0cfab616..af9de4110 100644
--- a/pkg/rdkafka/Tests/Client/RdKafkaDriverTest.php
+++ b/pkg/rdkafka/Tests/Client/RdKafkaDriverTest.php
@@ -13,6 +13,9 @@
use Enqueue\Test\ClassExtensionTrait;
use Interop\Queue\PsrProducer;
+/**
+ * @group rdkafka
+ */
class RdKafkaDriverTest extends \PHPUnit_Framework_TestCase
{
use ClassExtensionTrait;
diff --git a/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php b/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php
index 950034205..6816e1690 100644
--- a/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php
+++ b/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php
@@ -13,6 +13,9 @@
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Reference;
+/**
+ * @group rdkafka
+ */
class RdKafkaTransportFactoryTest extends TestCase
{
use ClassExtensionTrait;
diff --git a/pkg/simple-client/SimpleClient.php b/pkg/simple-client/SimpleClient.php
index 640211781..000743259 100644
--- a/pkg/simple-client/SimpleClient.php
+++ b/pkg/simple-client/SimpleClient.php
@@ -22,7 +22,11 @@
use Enqueue\Fs\Symfony\FsTransportFactory;
use Enqueue\Gps\GpsConnectionFactory;
use Enqueue\Gps\Symfony\GpsTransportFactory;
+use Enqueue\Mongodb\MongodbConnectionFactory;
+use Enqueue\Mongodb\Symfony\MongodbTransportFactory;
use Enqueue\Null\Symfony\NullTransportFactory;
+use Enqueue\RdKafka\RdKafkaConnectionFactory;
+use Enqueue\RdKafka\Symfony\RdKafkaTransportFactory;
use Enqueue\Redis\RedisConnectionFactory;
use Enqueue\Redis\Symfony\RedisTransportFactory;
use Enqueue\Rpc\Promise;
@@ -351,6 +355,18 @@ class_exists(AmqpLibConnectionFactory::class)
$extension->addTransportFactory(new MissingTransportFactory('gps', ['enqueue/gps']));
}
+ if (class_exists(RdKafkaConnectionFactory::class)) {
+ $extension->addTransportFactory(new RdKafkaTransportFactory('rdkafka'));
+ } else {
+ $extension->addTransportFactory(new MissingTransportFactory('rdkafka', ['enqueue/rdkafka']));
+ }
+
+ if (class_exists(MongodbConnectionFactory::class)) {
+ $extension->addTransportFactory(new MongodbTransportFactory('mongodb'));
+ } else {
+ $extension->addTransportFactory(new MissingTransportFactory('mongodb', ['enqueue/mongodb']));
+ }
+
return $extension;
}
diff --git a/pkg/simple-client/Tests/Functional/SimpleClientTest.php b/pkg/simple-client/Tests/Functional/SimpleClientTest.php
index 75d508c8d..87d60034b 100644
--- a/pkg/simple-client/Tests/Functional/SimpleClientTest.php
+++ b/pkg/simple-client/Tests/Functional/SimpleClientTest.php
@@ -7,8 +7,8 @@
use Enqueue\Consumption\Extension\LimitConsumptionTimeExtension;
use Enqueue\Consumption\Result;
use Enqueue\SimpleClient\SimpleClient;
-use Enqueue\Test\RabbitmqAmqpExtension;
use Enqueue\Test\RabbitManagementExtensionTrait;
+use Enqueue\Test\RabbitmqAmqpExtension;
use Interop\Queue\PsrMessage;
use PHPUnit\Framework\TestCase;
@@ -73,6 +73,27 @@ public function transportConfigDataProvider()
],
],
]];
+
+ yield [[
+ 'transport' => [
+ 'default' => 'rabbitmq_amqp',
+ 'rabbitmq_amqp' => [
+ 'driver' => 'ext',
+ 'host' => getenv('RABBITMQ_HOST'),
+ 'port' => getenv('RABBITMQ_AMQP__PORT'),
+ 'user' => getenv('RABBITMQ_USER'),
+ 'pass' => getenv('RABBITMQ_PASSWORD'),
+ 'vhost' => getenv('RABBITMQ_VHOST'),
+ ],
+ ],
+ ]];
+
+ yield 'mongodb_dsn' => [[
+ 'transport' => [
+ 'default' => 'mongodb',
+ 'mongodb' => getenv('MONGO_DSN'),
+ ],
+ ]];
}
/**
diff --git a/pkg/test/MongodbExtensionTrait.php b/pkg/test/MongodbExtensionTrait.php
new file mode 100644
index 000000000..4d94fca40
--- /dev/null
+++ b/pkg/test/MongodbExtensionTrait.php
@@ -0,0 +1,21 @@
+markTestSkipped('The MONGO_DSN env is not available. Skip tests');
+ }
+
+ $factory = new MongodbConnectionFactory(['dsn' => $env]);
+
+ $context = $factory->createContext();
+
+ return $context;
+ }
+}