Skip to content

An Introduction To PZQ

ianbarber edited this page Sep 26, 2011 · 2 revisions

An Introduction To PZQ

PZQ is a persistent store daemon by Mikko Koppanen which uses the ZeroMQ messaging socket library (version 2) for communication and Kyoto Cabinet for disc and in memory data storage. Using ZeroMQ makes the service almost totally language independent - it’s accessible by anything that has ZeroMQ bindings (though we’ll use PHP in the examples, and the client library supplied along with PZQ).

Following the ZeroMQ and Unix philosophy of doing few things, but doing them well, PZQ is designed to act as a simple store-and-forward device, with the following sorts of use cases as the main objective, each of which is discussed below:

  1. Provide a replacement for ZeroMQ SWAP
  2. Perform as an asynchronous job queue

It's worth noting this is not the solution to every problem; this is a simple component that can be part of a good solution given certain constraints, such as the one being laid out here.

Replacing ZeroMQ SWAP

ZeroMQ SWAP is used to allow ZeroMQ to exceed it's High Water Mark setting. HWM is itself used to prevent overflowing the memory of the system with queued messages, and SWAP allows offloading parts of the queue to disk. However, SWAP has several problems: it's a 'black box' that messages go into without the ability to inspect it, and it can't be persistently stored, so if a process is restarted any messages in SWAP are lost. For these reasons, and others, SWAP has been removed in future versions of ZeroMQ.

PZQ provides a more elegant persistence solution, but comes with a few requirements of it's own. PZQ runs as a separate process, as a daemon, which means it requires being managed as with any other service. It also does not have access to the message delivery information SWAP had inside the ZMQ library, so it relies on an explicit acknowledgement of message receipt by message consumers. As you might expect, it also explicitly acknowledges receipt of a message to senders. PZQ does however allow for message retrieval from disk after restart, configurable timeouts for resending lost messages, a variety of tuneable parameters, and support for monitoring essential statistics, which makes it a much more stable and practical solution.

Architecture wise, lets imagine we had a PUSH/PULL work distribution scenario, where work was coming in to a central point which was firing it out to a variable number of workers. At certain times, all workers would be involved with larger processes, and would be unable to service the central queue. This would then fill up, and require use of SWAP. We'll keep the basic sockets, so the interface can be mostly the same, but add the new PZQ device in the middle.

We can run pzq with a command like this:

./pzq --sync-divisor 10 --ack-timeout 2000000 --receive-dsn tcp://*:11131 --publish-dsn tcp://*:11133

We’re actually specifying some of the parameters even though they are the same as default value, for this example. --sync-divisor determines how often the messages are flushed - in this case we’ve bumped it up as we’re expecting fairly high throughput, and are willing to trade a little reliability for performance (for example, in a sudden power outage, not every message in the queue may be on disk with this setting). The default of 0 syncs after every message.

--ack-timeout controls how many microseconds must pass after PZQ sends a message for it to consider the message lost. Again, as we’re using this as a straightforward device we’re expecting our downstream sockets to ack as soon as they have the message, so we’ve reduced this from it’s default parameter of 5 seconds.

The two DSNs are the input and output sockets, and are specified as with normal ZeroMQ sockets.

We can then send work, for example:

<?php

include 'PZQClient.php';

$p = new PZQProducer ("tcp://127.0.0.1:11131");

for ($i = 0; $i < 100; $i++)
{
    $message = new PZQMessage ();
    $message->set_id ($i);
    $message->set_message (array ("work type", "argument"));

    $p->produce ($message);
}

We can then run as many consumers as we like to do the work, which will pull from the queue like this:

<?php

include 'PZQClient.php';

$c = new PZQConsumer ("tcp://127.0.0.1:11133");

for ($i = 0; $i < 500000; $i++)
{
    $message = $c->consume ();
    $body = $message->get_message();
    echo "Doing "  . $body[0] . " - for ID "
          . $message->get_id() . PHP_EOL;
    $c->ack ($message);
}

PZQ also allows us to monitor the queue via a monitoring socket. This will return an array with the number of messages, the size of the databases, and a few other statistics.

<?php
include 'PZQClient.php';

$m = new PZQMonitor ("ipc:///tmp/pzq-monitor");
var_dump ($m->get_stats ());

Of course, errors can still occur. For example, what if PZQ runs out of disk space?

We actually have several ways to control the size of the databases. PZQ has two datastores active at any time, the message database and the inflight database. The inflight database contains any messages which have been sent but not yet acknowledge - it’s size can be controlled by the --inflight-size parameter, which controls the maximum size in bytes of that DB. If the size will exceed this, the least recently used values are dropped.

The size of the database on disk is another potential issue. PZQ manages this by providing the current size of both databases via the monitoring socket. However in general the available disk space on the server should be monitored as with any datastore - for example via SNMP, Monit, or other monitoring methods. That said, in many queueing applications the size of the messages means a very large number would have to be stored to take up significant disk space!

PZQ as a job queue

Often there will be a requirement to do work on a separate system, or to defer work to outside of a particular script invocation, for example. Generally this involves queueing the work for later, either in a general system like a database table, a targeted job queue like Gearman, or a fully fledged message queue service. While a messaging socket library like ZeroMQ is excellent for messaging, a long running process of some type is required to serve the role of the queue. In our case, PZQ handles this role perfectly, and allows workers and clients to come and go freely.

For this example, we'll look at separating email sending from a script, perhaps when a user requests sharing a link with a friend or similar. Our clients will connect where they would have sent the email, and put a message onto the queue containing the email address and variables for the template. PZQ will attempt to send this message onto any available workers, who will pick up work at their own rate, and allow us to scale up the email sending by running more workers, which PZQ will automatically distribute the work between.

For this, we’ll configure PZQ slightly differently than before:

./pzq --ack-timeout 60000000

Due to the fact that we’re expecting a longer process to happen before the item is ACKed, we give it a longer grace period before the message is resent.

We can call the email sender through our script:

<?php
include 'PZQClient.php';

$fromaddress = $_REQUEST['from'];
$toaddress = $_REQUEST['email'];
$template = 'sendtoafriend';

$p = new PZQProducer ("tcp://127.0.0.1:11131");
$message = new PZQMessage();
$message->set_message(array($fromaddress, $toaddress, $template));
$p->produce($message);

header ('HTTP/1.1 301 Moved Permanently');
header("Location: /?message=sent");

On the worker side we can run the email sending processes as and when we need to. We’ll only acknowledge the message once we have actually sent it. We’ll also just die after a certain number of messages handled - presumably we’ll be using a system to run this at intervals.

<?php
include 'PZQClient.php';
$c = new PZQConsumer ("tcp://127.0.0.1:11132");

$hour = array();
$lastMinute = 0;
$i = 0;

while($i++ < 1000) { // die after handling so many reqs
    echo "Consuming", PHP_EOL;
    $message = $c->consume();
    list($from, $to, $template) = $message->get_message();
    echo "Parsing ", $template, PHP_EOL;
    echo "Sending To ", $to, PHP_EOL;
    // Send message here
    $c->ack($message);
}

The error cases are pretty simple:

  1. No email senders available

If there are no workers available, the message will be queued within PZQ normally until workers become available. The number of messages waiting in this way will be available for inspection via the monitoring options discussed in the first scenario.

  1. Email senders crash while processing the message

In this case, since the sending happens before the message has been ack’d, once the redelivery timeout expires PZQ will push the work out to be processed again. However, this will likely cause a duplicate message issue if there is only 1 email sender.

This is because messages are pushed onto an internal ZeroMQ queue - and ZMQ will hold them until someone connects, which may well be after the in-flight timeout expires. In that case, PZQ will assume they are lost, even though they are in reality in still queued within ZeroMQ, and will resend them. This at-least-once delivery behavior is intentional, but requires clients be able to discard occasional duplicate messages - for which an unique ID and unique timestamp is provided. The PHP client library included has some preemptive timeout detection which allows it to discard these types of duplicates, but it is potentially an issue for other clients.

  1. PZQ is unavailable

Every service needs to be restarted every now and again for various reasons, and PZQ is at the heart of the architecture. However, it is completely possible to run two PZQ services, and bind to them both the same client API, which would allow us to maintain high availability - though this isn't in the client code shown so far.

Clone this wiki locally