Skip to content

shomali11/parallelizer

Repository files navigation

parallelizer Build Status Go Report Card GoDoc License: MIT

Simplifies creating a pool of workers that execute jobs in parallel

Features

  • Easy to use
  • Context Support
  • Fail fast with errors
  • Customizable Pool Size
    • Default number of workers is 10
  • Customizable Job Queue Size
    • Default size is 100

Examples

Example 1

Running multiple function calls in parallel without a timeout.

package main

import (
	"fmt"
	"github.com/shomali11/parallelizer"
)

func main() {
	group := parallelizer.NewGroup()
	defer group.Close()

	group.Add(func() error {
		for char := 'a'; char < 'a'+3; char++ {
			fmt.Printf("%c ", char)
		}
		return nil
	})

	group.Add(func() error {
		for number := 1; number < 4; number++ {
			fmt.Printf("%d ", number)
		}
		return nil
	})

	err := group.Wait()

	fmt.Println()
	fmt.Println("Done")
	fmt.Printf("Error: %v", err)
}

Output:

a 1 b 2 c 3 
Done
Error: <nil>

Example 2

Running multiple slow function calls in parallel with a context with a short timeout. Note: The timeout will not kill the routines. It will just stop waiting for them to finish

package main

import (
	"context"
	"fmt"
	"github.com/shomali11/parallelizer"
	"time"
)

func main() {
	group := parallelizer.NewGroup()
	defer group.Close()

	group.Add(func() error {
		time.Sleep(2 * time.Second)

		fmt.Println("Finished work 1")

		return nil
	})

	group.Add(func() error {
		time.Sleep(2 * time.Second)

		fmt.Println("Finished work 2")

		return nil
	})

	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()

	err := group.Wait(parallelizer.WithContext(ctx))

	fmt.Println("Done")
	fmt.Printf("Error: %v", err)
	fmt.Println()

	time.Sleep(2 * time.Second)
}

Output:

Done
Error: context deadline exceeded
Finished work 2
Finished work 1

Example 3

Running multiple function calls in parallel with a large enough worker pool.

package main

import (
	"fmt"
	"github.com/shomali11/parallelizer"
)

func main() {
	group := parallelizer.NewGroup(parallelizer.WithPoolSize(10))
	defer group.Close()

	for i := 1; i <= 10; i++ {
		i := i
		group.Add(func() error {
			fmt.Print(i, " ")
			return nil
		})
	}

	err := group.Wait()

	fmt.Println()
	fmt.Println("Done")
	fmt.Printf("Error: %v", err)
}

Output:

7 6 3 2 8 9 5 10 1 4  
Done
Error: <nil>

Example 4

Running multiple function calls with 1 worker. Note: the functions are no longer executed in parallel but sequentially

package main

import (
	"fmt"
	"github.com/shomali11/parallelizer"
)

func main() {
	group := parallelizer.NewGroup(parallelizer.WithPoolSize(1))
	defer group.Close()

	for i := 1; i <= 10; i++ {
		i := i
		group.Add(func() error {
			fmt.Print(i, " ")
			return nil
		})
	}

	err := group.Wait()

	fmt.Println()
	fmt.Println("Done")
	fmt.Printf("Error: %v", err)
}

Output:

1 2 3 4 5 6 7 8 9 10 
Done
Error: <nil>

Example 5

Running multiple function calls in parallel with a small worker pool and job queue size. Note: the Add call blocks until there is space to push into the Job Queue

package main

import (
	"fmt"
	"github.com/shomali11/parallelizer"
	"time"
)

func main() {
	group := parallelizer.NewGroup(parallelizer.WithPoolSize(1), parallelizer.WithJobQueueSize(1))
	defer group.Close()

	for i := 1; i <= 10; i++ {
		group.Add(func() error {
			time.Sleep(time.Second)
			return nil
		})

		fmt.Println("Job added at", time.Now().Format("04:05"))
	}

	err := group.Wait()

	fmt.Println()
	fmt.Println("Done")
	fmt.Printf("Error: %v", err)
}

Output:

Job added at 00:12
Job added at 00:13
Job added at 00:14
Job added at 00:15
Job added at 00:16
Job added at 00:17
Job added at 00:18
Job added at 00:19
Job added at 00:20
Job added at 00:21

Done
Error: <nil>

Example 6

Running multiple function calls in parallel with a large enough worker pool and job queue size. Note: In here the Add calls did not block because there was plenty of space in the Job Queue

package main

import (
	"fmt"
	"github.com/shomali11/parallelizer"
	"time"
)

func main() {
	group := parallelizer.NewGroup(parallelizer.WithPoolSize(10), parallelizer.WithJobQueueSize(10))
	defer group.Close()

	for i := 1; i <= 10; i++ {
		group.Add(func() error {
			time.Sleep(time.Second)
			return nil
		})

		fmt.Println("Job added at", time.Now().Format("04:05"))
	}

	err := group.Wait()

	fmt.Println()
	fmt.Println("Done")
	fmt.Printf("Error: %v", err)
}

Output:

Job added at 00:30
Job added at 00:30
Job added at 00:30
Job added at 00:30
Job added at 00:30
Job added at 00:30
Job added at 00:30
Job added at 00:30
Job added at 00:30
Job added at 00:30

Done
Error: <nil>

Example 7

Showing an example without calling Wait

package main

import (
	"fmt"
	"github.com/shomali11/parallelizer"
	"time"
)

func main() {
	group := parallelizer.NewGroup()
	defer group.Close()

	group.Add(func() error {
		fmt.Println("Finished work")
		return nil
	})

	fmt.Println("We did not wait!")

	time.Sleep(time.Second)
}

Output:

We did not wait!
Finished work

Example 8

Showing an example with a mixture of Add and Wait calls.

package main

import (
	"fmt"
	"github.com/shomali11/parallelizer"
)

func main() {
	group := parallelizer.NewGroup()
	defer group.Close()

	group.Add(func() error {
		fmt.Println("Worker 1")
		return nil
	})

	group.Add(func() error {
		fmt.Println("Worker 2")
		return nil
	})

	fmt.Println("Waiting for workers 1 and 2 to finish")

	group.Wait()

	fmt.Println("Workers 1 and 2 have finished")

	group.Add(func() error {
		fmt.Println("Worker 3")
		return nil
	})

	fmt.Println("Waiting for worker 3 to finish")

	group.Wait()

	fmt.Println("Worker 3 has finished")
}

Output:

Waiting for workers 1 and 2 to finish
Worker 1
Worker 2
Workers 1 and 2 have finished
Waiting for worker 3 to finish
Worker 3
Worker 3 has finished

Example 9

Showing an example with a failed task.

package main

import (
	"errors"
	"fmt"
	"time"

	"github.com/shomali11/parallelizer"
)

func main() {
	group := parallelizer.NewGroup()
	defer group.Close()

	group.Add(func() error {
		return errors.New("something went wrong")
	})

	group.Add(func() error {
		time.Sleep(10 * time.Second)
		return nil
	})

	err := group.Wait()

	fmt.Println()
	fmt.Println("Done")
	fmt.Printf("Error: %v", err)
}

Output:

Done
Error: something went wrong