Skip to content

calston/rhumba

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

79 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Rhumba

Build Status

Rhumba is an asynchronous job queue and distributed task service with scheduling.

Installation

$ pip install rhumba

Usage

Rhumba tasks are created as plugins configured on each Rhumba worker. The plugin must contain a class called Plugin and callable task methods should be prefixed with call_

from rhumba import RhumbaPlugin

class Plugin(RhumbaPlugin):
    def call_test(self, args):
        self.log("Test call %s" % repr(args))

A plugin is connected to a queue in the Rhumba YAML configuration file

queues:
    - name: myqueue
      plugin: myplugin

Plugins must exist in the PYTHON_PATH of the process

Then start up your Rhumba worker with twistd -n rhumba -c myconfig.yaml

(ve)colin@nemesis:~/rhumba$ twistd -n rhumba -c examples/rhumba.yml
2015-11-08 23:15:20+0200 [-] Log opened.
2015-11-08 23:15:20+0200 [-] twistd 15.4.0 (/home/colin/rhumba/ve/bin/python 2.7.6) starting up.
2015-11-08 23:15:20+0200 [-] reactor class: twisted.internet.epollreactor.EPollReactor.
2015-11-08 23:15:20+0200 [-] Starting Rhumba
2015-11-08 23:15:20+0200 [-] Starting queue testqueue: plugin=<examples.testplugin.Plugin object at 0x7f0836655190>
2015-11-08 23:15:21+0200 [RedisClient,client] Queing testqueue scheduled job {'message': 'crontest', 'version': 1, 'params': {}, 'id': 'd1f23d42865d11e58d6482576555349e'}
2015-11-08 23:15:21+0200 [RedisClient,client] [testqueue]: tick!

Task methods can return a deferred, do not implement blocking tasks.

from twisted.mail.smtp import sendmail
from rhumba import RhumbaPlugin

class Plugin(RhumbaPlugin):
    def mailSent(self, from, to, msg):
        self.log('Mail from %s to %s sent successfully' % (from, to))

    def call_sendemail(self, args):
        a = (args['from'], args['to'], args['message'])
        d = sendmail('localhost', *a)

        d.addCallback(self.mailSent, *a)
        
        return d

Scheduling tasks

Rhumbas @cron decorator can be used to schedule tasks.

from rhumba import RhumbaPlugin, cron

class Plugin(RhumbaPlugin):
    @cron(hour=12)
    def call_scheduled(self, args):
        self.log("It's 12pm!")

The cron decorator accepts 5 keyword arguments: secs, min, hour, day, month and weekday. All of these arguments except weekday can also accept a string of the format "*/n" in which case the task will be run every n seconds, minutes, hours, days or months. The arguments may contain any combination of requirements as long as they make sense.

For example you may have a task run every 5 minutes from 1pm to 2pm on Sundays in June.

from rhumba import RhumbaPlugin, cron, crontab

class Plugin(RhumbaPlugin):
    @cron(mins="*/5", hour=13, month=6, weekday=crontab.Sunday)
    def call_scheduled(self, args):
        self.log("It's 12pm!")

Matching ranges and lists is also possible, eg hour="1-3" or day=(5, 6, 9) and any combination of these.

from rhumba import RhumbaPlugin, cron, crontab

class Plugin(RhumbaPlugin):
    @cron(hour="8-17", weekday=crontab.Weekdays)
    def call_during_business_hours(self, args):
        # do stuff 

Tasks can also return JSON serialisable results.

Calling tasks

Rhumba provides a synchronous client suitable for use with frameworks like Django as an alternative to Celery, as well as an async client for Twisted.

Synchronous client example

from django.shortcuts import redirect
# etc... 

from rhumba.client import RhumbaClient

def sendemail(request):
    if request.method == "POST":
        form = SomeEmailForm(request.POST)
        if form.is_valid():
            c = RhumbaClient()
            c.queue('myqueue', 'sendemail', form.cleaned_data)

    return redirect('index')

You can not send Django ORM objects down the wire to Rhumba, if you need to access your database you have to reimplement those queries in Twisted

You can retrieve the result of a task (if one is returned) until it expires using the getResult method. Expiry can be set in the workers queue configuration using the expire option and defaults to 1 hour.

def sendemail(request):
    if request.method == "POST":
        form = SomeEmailForm(request.POST)
        if form.is_valid():
            c = RhumbaClient()
            taskid = c.queue('myqueue', 'sendemail', form.cleaned_data)

    return redirect('checkmail', id=taskid)

def checkmail(request, id):
    # This is a really silly example, do something sane instead...
    r = RhumbaClient().getResult('myqueue', id)
    if r:
        return redirect('index')
    else:
        return redirect('checkmail', id=id)

The async client rhumba.client.AsyncRhumbaClient provides the same API but returns deferreds as expected using the txredis client.

HTTP API

As with all new fangled software Rhumba provides an HTTP API on port 7701 which will return stats about queues and workers in the pool and also provides a convenient way to queue tasks and wait for them without connecting directly to the backend.

Cluster stats

GET /cluster/

{
    "workers": {
        "cthulhu": [
            {
                "status": "ready",
                "lastseen": 1448567076.76,
                "id": "ffc0dc9e947511e59f38448a5b5fd8c0"
            }
        ]
    },
    "queues": {
        "testqueue": {
            "waiting": 0, 
            "messages": {
                "crontest": {
                    "count": 345, 
                    "time": 22.99
                },
                "test": {
                    "count": 6,
                    "time": 52000.62
                }
            }
        }
    },
    "crons": {
        "testqueue": {
            "master": "ffc0dc9e947511e59f38448a5b5fd8c0:cthulhu",
            "methods": {
                "call_crontest": 1448567073.0
            }
        }
    }
}

Queues

GET /queues/

["testqueue"]

GET /queues/testqueue/call/test

{
    "uid": "15b81fb6947711e58154448a5b5fd8c0"
}

GET /queues/testqueue/result/15b81fb6947711e58154448a5b5fd8c0

{
    "result": ["Hello!"], 
    "time": 1448567500.198432
}

GET /queues/testqueue/wait/test

{
    "result": ["Hello!"], 
    "time": 1448567475.792436
}

Both wait and call accept HTTP POST requests with a json payload

POST /queues/testqueue/wait/test {"name": "World"}

{
    "result": ["Hello, World!"], 
    "time": 1448567851.689572
}

POST /queues/testqueue/fanout/test {"name": "World"}

{
    "uid": "15b81fb6947711e58154448a5b5fd8c0"
}

Makes a fanout request to all servers active on a specific queue

POST /queues/testqueue/fanout/wait/test {"name": "World"}

{
    "511c043cbcf411e5abaa82576555349e": {
        "result": ["Hello, test!"],
        "time": 1453019709.481646
    }, 
    "f363fe66bcf411e597fa448a5b5fd8c0": {
        "result": ["Hello, test!"],
        "time": 1453019597.918752
    }
}

Makes a fanout request to all servers and waits for all of them to return a result

About

An asynchronous job queue written in Twisted

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages