Skip to content

Commit

Permalink
Add multithreaded matmul-c implementation
Browse files Browse the repository at this point in the history
Adds a CPU-parallel matrix multiplication implementation using a threadpool.
Parallelism is intra-matrix (i.e. different parts of a single
matrix-multiply operation are performed on different threads) so that
threading is transparent to the caller.
  • Loading branch information
llewelld committed Apr 23, 2024
1 parent 9aa7662 commit 6b00297
Show file tree
Hide file tree
Showing 6 changed files with 305 additions and 9 deletions.
1 change: 1 addition & 0 deletions implementation/matmul-c/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ matmul-c: \
src/matrix.c \
src/operations.c \
src/parse_header.c \
src/threadpool.c \
src/tests.c
$(CC) $(CFLAGS) -o$@ $^ $(CLIBS)

Expand Down
1 change: 1 addition & 0 deletions implementation/matmul-c/include/matrix.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@ Matrix * new_matrix(uint16_t height, uint16_t width);
Matrix * delete_matrix(Matrix *A);
Matrix * new_matrix_identity(uint16_t height, uint16_t width);
void matrix_print(Matrix *A);
void matrix_fill(Matrix *A, uint32_t seed);

#endif /* __MATRIX_MATRIX_H */
24 changes: 24 additions & 0 deletions implementation/matmul-c/include/threadpool.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/* vim: noet:ts=2:sts=2:sw=2 */

/* SPDX-License-Identifier: MIT */
/* Copyright © 2024 David Llewellyn-Jones */

#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include <stdbool.h>
#include <string.h>
#include <pthread.h>

#include "matrix.h"

#ifndef __MATRIX_THREADPOOL_H
#define __MATRIX_THREADPOOL_H (1)

typedef struct _ThreadPool ThreadPool;

ThreadPool * new_threadpool();
ThreadPool * delete_threadpool(ThreadPool *pool);
bool multiply_parallel(ThreadPool *pool, Matrix *result, Matrix *A, Matrix *B);

#endif /* __MATRIX_THREADPOOL_H */
87 changes: 79 additions & 8 deletions implementation/matmul-c/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,13 @@
#include "operations.h"
#include "load.h"
#include "tests.h"
#include "threadpool.h"

#define BENCHMARK_REPEAT (32768)
#define BENCHMARK_REPEAT_SMALL (32768)
#define BENCHMARK_REPEAT_LARGE (1)

#define HEIGHT (2048)
#define WIDTH (2048)

int main(int argc, char *argv[]) {
Matrix *A;
Expand All @@ -20,12 +25,19 @@ int main(int argc, char *argv[]) {
bool result;
uint32_t total;

ThreadPool *pool = new_threadpool();

// Play around with the API
printf("Example matrix manipulation...\n");
A = matrix_load("../testdata/matrix-a.npy");
B = matrix_load("../testdata/matrix-b.npy");
C = matrix_load("../testdata/matrix-c.npy");
D = new_matrix(4, 4);

printf("pre multiply\n");
result = multiply_parallel(pool, D, A, B);
printf("Post multiply\n");

result = multiply(D, A, B);
result = equals(C, D);
printf("Result of A * B:\n");
Expand Down Expand Up @@ -60,28 +72,87 @@ int main(int argc, char *argv[]) {
matrix_print(d->matrices[index].matrix);
}
}
printf("Multiplication tests passed: %u out of %u\n", passed, total);
for (uint32_t index = 0; index < total; ++index) {
result = multiply_parallel(pool, d->matrices[index].matrix, a->matrices[index].matrix, b->matrices[index].matrix);
result = result && equals(c->matrices[index].matrix, d->matrices[index].matrix);
if (result) {
passed += 1;
}
else {
printf("Incorrect result\n");
matrix_print(c->matrices[index].matrix);
matrix_print(d->matrices[index].matrix);
}
}
printf("Multiplication tests passed: %u out of %u\n", passed, total * 2);

// Measure time taken to perform 16777216 multiplications
struct timespec start_time, end_time;
uint32_t operations;
double elapsed;
double ops_per_sec;

// Create a pair of big random matrices
A = new_matrix(HEIGHT, WIDTH);
B = new_matrix(WIDTH, HEIGHT);
D = new_matrix(HEIGHT, HEIGHT);
matrix_fill(A, 8);
matrix_fill(B, 16);
operations = BENCHMARK_REPEAT_LARGE;

printf("Benchmarking...\n");
clock_t start_time = clock();
for (uint32_t count = 0; count < BENCHMARK_REPEAT; ++count) {

clock_gettime(CLOCK_MONOTONIC, &start_time);
for (uint32_t count = 0; count < operations; ++count) {
multiply(D, A, B);
}
clock_gettime(CLOCK_MONOTONIC, &end_time);
elapsed = (end_time.tv_sec - start_time.tv_sec);
elapsed += (end_time.tv_nsec - start_time.tv_nsec) / 1000000000.0;

printf("Time taken to perform %u standard large multiply operations: %.02f seconds\n", operations, elapsed);
ops_per_sec = operations / elapsed;
printf("Equivalent to %.02f operations per second\n", ops_per_sec);

clock_gettime(CLOCK_MONOTONIC, &start_time);
for (uint32_t count = 0; count < operations; ++count) {
multiply_parallel(pool, D, A, B);
}
clock_gettime(CLOCK_MONOTONIC, &end_time);
elapsed = (end_time.tv_sec - start_time.tv_sec);
elapsed += (end_time.tv_nsec - start_time.tv_nsec) / 1000000000.0;

printf("Time taken to perform %u standard large parallel multiply operations: %.02f seconds\n", operations, elapsed);
ops_per_sec = operations / elapsed;
printf("Equivalent to %.02f operations per second\n", ops_per_sec);

A = delete_matrix(A);
B = delete_matrix(B);
D = delete_matrix(D);

clock_gettime(CLOCK_MONOTONIC, &start_time);
for (uint32_t count = 0; count < BENCHMARK_REPEAT_SMALL; ++count) {
for (uint32_t index = 0; index < total; ++index) {
multiply(d->matrices[index].matrix, a->matrices[index].matrix, b->matrices[index].matrix);
}
}
clock_t end_time = clock();
double elapsed = (double)(end_time - start_time) / CLOCKS_PER_SEC;
uint32_t operations = total * BENCHMARK_REPEAT;
clock_gettime(CLOCK_MONOTONIC, &end_time);

elapsed = (end_time.tv_sec - start_time.tv_sec);
elapsed += (end_time.tv_nsec - start_time.tv_nsec) / 1000000000.0;

operations = total * BENCHMARK_REPEAT_SMALL;
printf("Time taken to perform %u multiply operations: %.02f seconds\n", operations, elapsed);
double ops_per_sec = operations / elapsed;
ops_per_sec = operations / elapsed;
printf("Equivalent to %.02f operations per second\n", ops_per_sec);

a = delete_matrices(a);
b = delete_matrices(b);
c = delete_matrices(c);
d = delete_matrices(d);

pool = delete_threadpool(pool);

return EXIT_SUCCESS;
}

Expand Down
10 changes: 9 additions & 1 deletion implementation/matmul-c/src/matrix.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,13 @@ void matrix_print(Matrix *A) {
}
}


void matrix_fill(Matrix *A, uint32_t seed) {
srand(seed);
if (A) {
uint32_t size = A->height * A->width;
for (uint32_t index = 0; index < size; ++index) {
A->elements[index] = ((uint32_t)(rand() / (RAND_MAX / 1000))) / 10;
}
}
}

191 changes: 191 additions & 0 deletions implementation/matmul-c/src/threadpool.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
/* vim: noet:ts=2:sts=2:sw=2 */

/* SPDX-License-Identifier: MIT */
/* Copyright © 2024 David Llewellyn-Jones */

#include <stdio.h>
#include <stdint.h>
#include <math.h>
#include <stdbool.h>

#include "threadpool.h"

#define MAX_THREADS (8)

typedef struct _ThreadContext {
pthread_mutex_t *working_mutex;
pthread_cond_t *working_cond;
pthread_mutex_t *begin_mutex;
pthread_cond_t *begin_cond;
uint32_t *working;
bool live;

Matrix *result;
Matrix *A;
Matrix *B;
uint32_t start;
uint32_t end;
} ThreadContext;

struct _ThreadPool {
pthread_t thread_id[MAX_THREADS];
ThreadContext *context[MAX_THREADS];

pthread_mutex_t working_mutex;
pthread_cond_t working_cond;
pthread_mutex_t begin_mutex;
pthread_cond_t begin_cond;
uint32_t working;
};

void *thread_runner(void *vargp);

inline void multiply_work(Matrix *result, Matrix *A, Matrix *B, uint32_t start, uint32_t end) {
uint32_t apos = 0;
uint32_t bpos = 0;
// Step through all the entries in the result matrix
for (uint32_t cpos = start; cpos < end; ++cpos) {
// For A start at the beginning of the row
apos = (cpos / result->width) * A->width;
// For B start at the top of a column
bpos = cpos % result->width;
// Accumulator for the products
double element = 0.0f;
// For each entry in the result, sum the product of row/column pairs
for (uint32_t pos = 0; pos < A->width; ++pos) {
// Accumulate the products
element += A->elements[apos] * B->elements[bpos];
// Row stride of 1
apos += 1;
// Column stride of "width of B"
bpos += B->width;
}
// Write out the resulting element
result->elements[cpos] = element;
}
}

ThreadPool * new_threadpool() {
ThreadPool *pool = calloc(sizeof(ThreadPool), sizeof(char));

if (pool) {
// Initialise the pool context
pthread_mutex_init(&pool->working_mutex, NULL);
pthread_cond_init(&pool->working_cond, NULL);
pthread_mutex_init(&pool->begin_mutex, NULL);
pthread_cond_init(&pool->begin_cond, NULL);
pool->working = MAX_THREADS;

// Initialise the threads
for (uint32_t thread = 0; thread < MAX_THREADS; ++thread) {
pool->context[thread] = calloc(sizeof(ThreadContext), sizeof(char));
pool->context[thread]->working_mutex = &pool->working_mutex;
pool->context[thread]->working_cond = &pool->working_cond;
pool->context[thread]->begin_mutex = &pool->begin_mutex;
pool->context[thread]->begin_cond = &pool->begin_cond;
pool->context[thread]->working = &pool->working;
pool->context[thread]->live = true;

pthread_create(&pool->thread_id[thread], NULL, thread_runner, pool->context[thread]);
}

// Wait for the threads to initialise
pthread_mutex_lock(&pool->working_mutex);
while (pool->working > 0) {
pthread_cond_wait(&pool->working_cond, &pool->working_mutex);
}
pthread_mutex_unlock(&pool->working_mutex);
}
return pool;
}

ThreadPool * delete_threadpool(ThreadPool *pool) {
if (pool) {
// Remove all work
pthread_mutex_lock(&pool->begin_mutex);
for (uint32_t thread = 0; thread < MAX_THREADS; ++thread) {
pool->context[thread]->live = false;
}
pthread_cond_broadcast(&pool->begin_cond);
pthread_mutex_unlock(&pool->begin_mutex);

// Wait for the threads to complete
for (uint32_t thread = 0; thread < MAX_THREADS; ++thread) {
pthread_join(pool->thread_id[thread], NULL);
free(pool->context[thread]);
}

pthread_mutex_destroy(&pool->working_mutex);
pthread_cond_destroy(&pool->working_cond);
pthread_mutex_destroy(&pool->begin_mutex);
pthread_cond_destroy(&pool->begin_cond);

free(pool);
}
return NULL;
}

void *thread_runner(void *vargp) {
ThreadContext *context = (ThreadContext*)vargp;

while (context->live) {
// Signal that the work is done
pthread_mutex_lock(context->working_mutex);
pthread_mutex_lock(context->begin_mutex);
--(*context->working);
pthread_cond_signal(context->working_cond);
pthread_mutex_unlock(context->working_mutex);

// Wait for some work to arrive
pthread_cond_wait(context->begin_cond, context->begin_mutex);
pthread_mutex_unlock(context->begin_mutex);

if (context->live) {
// Do the work
multiply_work(context->result, context->A, context->B, context->start, context->end);
context->start = 0;
context->end = 0;
}
}
// No more work to do

return NULL;
}

bool multiply_parallel(ThreadPool *pool, Matrix *result, Matrix *A, Matrix *B) {
uint32_t size = result->height * result->width;

uint32_t chunk = (size + (MAX_THREADS - 1)) / MAX_THREADS;
uint32_t allocated = 0;
uint32_t thread = 0;

// Allocate the work
while (allocated < size) {
pool->context[thread]->result = result;
pool->context[thread]->A = A;
pool->context[thread]->B = B;
pool->context[thread]->start = allocated;
allocated += chunk;
if (allocated > size) {
allocated = size;
}
pool->context[thread]->end = allocated;
thread += 1;
}

// Trigger the runners to work
pthread_mutex_lock(&pool->begin_mutex);
pool->working = MAX_THREADS;
pthread_cond_broadcast(&pool->begin_cond);
pthread_mutex_unlock(&pool->begin_mutex);

// Wait for the work to complete
pthread_mutex_lock(&pool->working_mutex);
while (pool->working > 0) {
pthread_cond_wait(&pool->working_cond, &pool->working_mutex);
}
pthread_mutex_unlock(&pool->working_mutex);

return true;
}

0 comments on commit 6b00297

Please sign in to comment.