Skip to content

Commit

Permalink
Add test
Browse files Browse the repository at this point in the history
  • Loading branch information
arnaud-lb committed Dec 18, 2016
1 parent 319967c commit 0d558b4
Show file tree
Hide file tree
Showing 2 changed files with 189 additions and 0 deletions.
88 changes: 88 additions & 0 deletions tests/produce_consume.phpt
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
--TEST--
Produce, consume
--SKIPIF--
<?php
file_exists(__DIR__."/test_env.php") || die("skip");
--FILE--
<?php

require __DIR__."/test_env.php";

$delivered = 0;

$conf = new RdKafka\Conf();
$conf->set('broker.version.fallback', TEST_KAFKA_BROKER_VERSION);
$conf->setErrorCb(function ($producer, $err, $errstr) {
printf("%s: %s\n", rd_kafka_err2str($err), $errstr);
exit;
});
$conf->setDrMsgCb(function ($producer, $msg) use (&$delivered) {
if ($msg->err) {
throw new Exception("Message delivery failed: " . $msg->errstr());
}
$delivered++;
});

$producer = new RdKafka\Producer($conf);

if ($producer->addBrokers(TEST_KAFKA_BROKERS) < 1) {
echo "Failed adding brokers\n";
exit;
}

$topicName = sprintf("test_rdkafka_%s", uniqid());

$topic = $producer->newTopic($topicName);

if (!$producer->getMetadata(false, $topic, 2*1000)) {
echo "Failed to get metadata, is broker down?\n";
}

for ($i = 0; $i < 10; $i++) {
$topic->produce(0, 0, "message $i");
$producer->poll(0);
}

while ($producer->getOutQLen()) {
$producer->poll(50);
}

printf("%d messages delivered\n", $delivered);

$consumer = new RdKafka\Consumer($conf);
$consumer->addBrokers(TEST_KAFKA_BROKERS);

$topic = $consumer->newTopic($topicName);
$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);

$messages = [];

while (true) {
$msg = $topic->consume(0, 60*1000);
if (!$msg) {
continue;
}
switch ($msg->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
printf("Got message: %s\n", $msg->payload);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "EOF\n";
break 2;
default:
throw new Exception($message->errstr());
}
}
--EXPECT--
10 messages delivered
Got message: message 0
Got message: message 1
Got message: message 2
Got message: message 3
Got message: message 4
Got message: message 5
Got message: message 6
Got message: message 7
Got message: message 8
Got message: message 9
EOF
101 changes: 101 additions & 0 deletions tests/produce_consume_queue.phpt
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
--TEST--
Produce, consume queue
--SKIPIF--
<?php
file_exists(__DIR__."/test_env.php") || die("skip");
--FILE--
<?php

require __DIR__."/test_env.php";

$delivered = 0;

$conf = new RdKafka\Conf();
$conf->set('broker.version.fallback', TEST_KAFKA_BROKER_VERSION);
$conf->setErrorCb(function ($producer, $err, $errstr) {
printf("%s: %s\n", rd_kafka_err2str($err), $errstr);
exit;
});
$conf->setDrMsgCb(function ($producer, $msg) use (&$delivered) {
if ($msg->err) {
throw new Exception("Message delivery failed: " . $msg->errstr());
}
$delivered++;
});

$producer = new RdKafka\Producer($conf);

if ($producer->addBrokers(TEST_KAFKA_BROKERS) < 1) {
echo "Failed adding brokers\n";
exit;
}

$topicNames = [
sprintf("test_rdkafka_0_%s", uniqid()),
sprintf("test_rdkafka_1_%s", uniqid()),
];

$topics = array_map(function ($topicName) use ($producer) {
return $producer->newTopic($topicName);
}, $topicNames);

if (!$producer->getMetadata(false, reset($topics), 2*1000)) {
echo "Failed to get metadata, is broker down?\n";
}

for ($i = 0; $i < 10; $i++) {
$topics[$i%2]->produce(0, 0, "message $i");
$producer->poll(0);
}

while ($producer->getOutQLen()) {
$producer->poll(50);
}

printf("%d messages delivered\n", $delivered);

$consumer = new RdKafka\Consumer($conf);
$consumer->addBrokers(TEST_KAFKA_BROKERS);

$queue = $consumer->newQueue();

array_walk($topicNames, function ($topicName) use ($consumer, $queue) {
$topic = $consumer->newTopic($topicName);
$topic->consumeQueueStart(0, RD_KAFKA_OFFSET_BEGINNING, $queue);
});

$messages = [];
$eof = 0;

while ($eof < 2) {
$msg = $queue->consume(60*1000);
if (!$msg) {
continue;
}
switch ($msg->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
$messages[] = sprintf("Got message: %s from %s", $msg->payload, $msg->topic_name);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
$eof++;
break;
default:
throw new Exception($message->errstr());
}
}

sort($messages);
echo implode("\n", $messages), "\n";

--EXPECTF--
10 messages delivered
Got message: message 0 from test_rdkafka_0_%s
Got message: message 1 from test_rdkafka_1_%s
Got message: message 2 from test_rdkafka_0_%s
Got message: message 3 from test_rdkafka_1_%s
Got message: message 4 from test_rdkafka_0_%s
Got message: message 5 from test_rdkafka_1_%s
Got message: message 6 from test_rdkafka_0_%s
Got message: message 7 from test_rdkafka_1_%s
Got message: message 8 from test_rdkafka_0_%s
Got message: message 9 from test_rdkafka_1_%s

0 comments on commit 0d558b4

Please sign in to comment.