diff --git a/implementation/matmul-c/Makefile b/implementation/matmul-c/Makefile index 235138a..968e612 100644 --- a/implementation/matmul-c/Makefile +++ b/implementation/matmul-c/Makefile @@ -26,6 +26,7 @@ matmul-c: \ src/matrix.c \ src/operations.c \ src/parse_header.c \ + src/threadpool.c \ src/tests.c $(CC) $(CFLAGS) -o$@ $^ $(CLIBS) diff --git a/implementation/matmul-c/include/matrix.h b/implementation/matmul-c/include/matrix.h index 3f9fb5e..4d15c69 100644 --- a/implementation/matmul-c/include/matrix.h +++ b/implementation/matmul-c/include/matrix.h @@ -18,5 +18,6 @@ Matrix * new_matrix(uint16_t height, uint16_t width); Matrix * delete_matrix(Matrix *A); Matrix * new_matrix_identity(uint16_t height, uint16_t width); void matrix_print(Matrix *A); +void matrix_fill(Matrix *A, uint32_t seed); #endif /* __MATRIX_MATRIX_H */ diff --git a/implementation/matmul-c/include/threadpool.h b/implementation/matmul-c/include/threadpool.h new file mode 100644 index 0000000..21af958 --- /dev/null +++ b/implementation/matmul-c/include/threadpool.h @@ -0,0 +1,24 @@ +/* vim: noet:ts=2:sts=2:sw=2 */ + +/* SPDX-License-Identifier: MIT */ +/* Copyright © 2024 David Llewellyn-Jones */ + +#include +#include +#include +#include +#include +#include + +#include "matrix.h" + +#ifndef __MATRIX_THREADPOOL_H +#define __MATRIX_THREADPOOL_H (1) + +typedef struct _ThreadPool ThreadPool; + +ThreadPool * new_threadpool(); +ThreadPool * delete_threadpool(ThreadPool *pool); +bool multiply_parallel(ThreadPool *pool, Matrix *result, Matrix *A, Matrix *B); + +#endif /* __MATRIX_THREADPOOL_H */ diff --git a/implementation/matmul-c/main.c b/implementation/matmul-c/main.c index a6ccaf6..44c7ca9 100644 --- a/implementation/matmul-c/main.c +++ b/implementation/matmul-c/main.c @@ -9,8 +9,13 @@ #include "operations.h" #include "load.h" #include "tests.h" +#include "threadpool.h" -#define BENCHMARK_REPEAT (32768) +#define BENCHMARK_REPEAT_SMALL (32768) +#define BENCHMARK_REPEAT_LARGE (1) + +#define HEIGHT (2048) +#define WIDTH (2048) int main(int argc, char *argv[]) { Matrix *A; @@ -20,12 +25,19 @@ int main(int argc, char *argv[]) { bool result; uint32_t total; + ThreadPool *pool = new_threadpool(); + // Play around with the API printf("Example matrix manipulation...\n"); A = matrix_load("../testdata/matrix-a.npy"); B = matrix_load("../testdata/matrix-b.npy"); C = matrix_load("../testdata/matrix-c.npy"); D = new_matrix(4, 4); + + printf("pre multiply\n"); + result = multiply_parallel(pool, D, A, B); + printf("Post multiply\n"); + result = multiply(D, A, B); result = equals(C, D); printf("Result of A * B:\n"); @@ -60,21 +72,78 @@ int main(int argc, char *argv[]) { matrix_print(d->matrices[index].matrix); } } - printf("Multiplication tests passed: %u out of %u\n", passed, total); + for (uint32_t index = 0; index < total; ++index) { + result = multiply_parallel(pool, d->matrices[index].matrix, a->matrices[index].matrix, b->matrices[index].matrix); + result = result && equals(c->matrices[index].matrix, d->matrices[index].matrix); + if (result) { + passed += 1; + } + else { + printf("Incorrect result\n"); + matrix_print(c->matrices[index].matrix); + matrix_print(d->matrices[index].matrix); + } + } + printf("Multiplication tests passed: %u out of %u\n", passed, total * 2); // Measure time taken to perform 16777216 multiplications + struct timespec start_time, end_time; + uint32_t operations; + double elapsed; + double ops_per_sec; + + // Create a pair of big random matrices + A = new_matrix(HEIGHT, WIDTH); + B = new_matrix(WIDTH, HEIGHT); + D = new_matrix(HEIGHT, HEIGHT); + matrix_fill(A, 8); + matrix_fill(B, 16); + operations = BENCHMARK_REPEAT_LARGE; + printf("Benchmarking...\n"); - clock_t start_time = clock(); - for (uint32_t count = 0; count < BENCHMARK_REPEAT; ++count) { + + clock_gettime(CLOCK_MONOTONIC, &start_time); + for (uint32_t count = 0; count < operations; ++count) { + multiply(D, A, B); + } + clock_gettime(CLOCK_MONOTONIC, &end_time); + elapsed = (end_time.tv_sec - start_time.tv_sec); + elapsed += (end_time.tv_nsec - start_time.tv_nsec) / 1000000000.0; + + printf("Time taken to perform %u standard large multiply operations: %.02f seconds\n", operations, elapsed); + ops_per_sec = operations / elapsed; + printf("Equivalent to %.02f operations per second\n", ops_per_sec); + + clock_gettime(CLOCK_MONOTONIC, &start_time); + for (uint32_t count = 0; count < operations; ++count) { + multiply_parallel(pool, D, A, B); + } + clock_gettime(CLOCK_MONOTONIC, &end_time); + elapsed = (end_time.tv_sec - start_time.tv_sec); + elapsed += (end_time.tv_nsec - start_time.tv_nsec) / 1000000000.0; + + printf("Time taken to perform %u standard large parallel multiply operations: %.02f seconds\n", operations, elapsed); + ops_per_sec = operations / elapsed; + printf("Equivalent to %.02f operations per second\n", ops_per_sec); + + A = delete_matrix(A); + B = delete_matrix(B); + D = delete_matrix(D); + + clock_gettime(CLOCK_MONOTONIC, &start_time); + for (uint32_t count = 0; count < BENCHMARK_REPEAT_SMALL; ++count) { for (uint32_t index = 0; index < total; ++index) { multiply(d->matrices[index].matrix, a->matrices[index].matrix, b->matrices[index].matrix); } } - clock_t end_time = clock(); - double elapsed = (double)(end_time - start_time) / CLOCKS_PER_SEC; - uint32_t operations = total * BENCHMARK_REPEAT; + clock_gettime(CLOCK_MONOTONIC, &end_time); + + elapsed = (end_time.tv_sec - start_time.tv_sec); + elapsed += (end_time.tv_nsec - start_time.tv_nsec) / 1000000000.0; + + operations = total * BENCHMARK_REPEAT_SMALL; printf("Time taken to perform %u multiply operations: %.02f seconds\n", operations, elapsed); - double ops_per_sec = operations / elapsed; + ops_per_sec = operations / elapsed; printf("Equivalent to %.02f operations per second\n", ops_per_sec); a = delete_matrices(a); @@ -82,6 +151,8 @@ int main(int argc, char *argv[]) { c = delete_matrices(c); d = delete_matrices(d); + pool = delete_threadpool(pool); + return EXIT_SUCCESS; } diff --git a/implementation/matmul-c/src/matrix.c b/implementation/matmul-c/src/matrix.c index 6d2bcf5..a21099c 100644 --- a/implementation/matmul-c/src/matrix.c +++ b/implementation/matmul-c/src/matrix.c @@ -56,5 +56,13 @@ void matrix_print(Matrix *A) { } } - +void matrix_fill(Matrix *A, uint32_t seed) { + srand(seed); + if (A) { + uint32_t size = A->height * A->width; + for (uint32_t index = 0; index < size; ++index) { + A->elements[index] = ((uint32_t)(rand() / (RAND_MAX / 1000))) / 10; + } + } +} diff --git a/implementation/matmul-c/src/threadpool.c b/implementation/matmul-c/src/threadpool.c new file mode 100644 index 0000000..c1ecfa9 --- /dev/null +++ b/implementation/matmul-c/src/threadpool.c @@ -0,0 +1,191 @@ +/* vim: noet:ts=2:sts=2:sw=2 */ + +/* SPDX-License-Identifier: MIT */ +/* Copyright © 2024 David Llewellyn-Jones */ + +#include +#include +#include +#include + +#include "threadpool.h" + +#define MAX_THREADS (8) + +typedef struct _ThreadContext { + pthread_mutex_t *working_mutex; + pthread_cond_t *working_cond; + pthread_mutex_t *begin_mutex; + pthread_cond_t *begin_cond; + uint32_t *working; + bool live; + + Matrix *result; + Matrix *A; + Matrix *B; + uint32_t start; + uint32_t end; +} ThreadContext; + +struct _ThreadPool { + pthread_t thread_id[MAX_THREADS]; + ThreadContext *context[MAX_THREADS]; + + pthread_mutex_t working_mutex; + pthread_cond_t working_cond; + pthread_mutex_t begin_mutex; + pthread_cond_t begin_cond; + uint32_t working; +}; + +void *thread_runner(void *vargp); + +inline void multiply_work(Matrix *result, Matrix *A, Matrix *B, uint32_t start, uint32_t end) { + uint32_t apos = 0; + uint32_t bpos = 0; + // Step through all the entries in the result matrix + for (uint32_t cpos = start; cpos < end; ++cpos) { + // For A start at the beginning of the row + apos = (cpos / result->width) * A->width; + // For B start at the top of a column + bpos = cpos % result->width; + // Accumulator for the products + double element = 0.0f; + // For each entry in the result, sum the product of row/column pairs + for (uint32_t pos = 0; pos < A->width; ++pos) { + // Accumulate the products + element += A->elements[apos] * B->elements[bpos]; + // Row stride of 1 + apos += 1; + // Column stride of "width of B" + bpos += B->width; + } + // Write out the resulting element + result->elements[cpos] = element; + } +} + +ThreadPool * new_threadpool() { + ThreadPool *pool = calloc(sizeof(ThreadPool), sizeof(char)); + + if (pool) { + // Initialise the pool context + pthread_mutex_init(&pool->working_mutex, NULL); + pthread_cond_init(&pool->working_cond, NULL); + pthread_mutex_init(&pool->begin_mutex, NULL); + pthread_cond_init(&pool->begin_cond, NULL); + pool->working = MAX_THREADS; + + // Initialise the threads + for (uint32_t thread = 0; thread < MAX_THREADS; ++thread) { + pool->context[thread] = calloc(sizeof(ThreadContext), sizeof(char)); + pool->context[thread]->working_mutex = &pool->working_mutex; + pool->context[thread]->working_cond = &pool->working_cond; + pool->context[thread]->begin_mutex = &pool->begin_mutex; + pool->context[thread]->begin_cond = &pool->begin_cond; + pool->context[thread]->working = &pool->working; + pool->context[thread]->live = true; + + pthread_create(&pool->thread_id[thread], NULL, thread_runner, pool->context[thread]); + } + + // Wait for the threads to initialise + pthread_mutex_lock(&pool->working_mutex); + while (pool->working > 0) { + pthread_cond_wait(&pool->working_cond, &pool->working_mutex); + } + pthread_mutex_unlock(&pool->working_mutex); + } + return pool; +} + +ThreadPool * delete_threadpool(ThreadPool *pool) { + if (pool) { + // Remove all work + pthread_mutex_lock(&pool->begin_mutex); + for (uint32_t thread = 0; thread < MAX_THREADS; ++thread) { + pool->context[thread]->live = false; + } + pthread_cond_broadcast(&pool->begin_cond); + pthread_mutex_unlock(&pool->begin_mutex); + + // Wait for the threads to complete + for (uint32_t thread = 0; thread < MAX_THREADS; ++thread) { + pthread_join(pool->thread_id[thread], NULL); + free(pool->context[thread]); + } + + pthread_mutex_destroy(&pool->working_mutex); + pthread_cond_destroy(&pool->working_cond); + pthread_mutex_destroy(&pool->begin_mutex); + pthread_cond_destroy(&pool->begin_cond); + + free(pool); + } + return NULL; +} + +void *thread_runner(void *vargp) { + ThreadContext *context = (ThreadContext*)vargp; + + while (context->live) { + // Signal that the work is done + pthread_mutex_lock(context->working_mutex); + pthread_mutex_lock(context->begin_mutex); + --(*context->working); + pthread_cond_signal(context->working_cond); + pthread_mutex_unlock(context->working_mutex); + + // Wait for some work to arrive + pthread_cond_wait(context->begin_cond, context->begin_mutex); + pthread_mutex_unlock(context->begin_mutex); + + if (context->live) { + // Do the work + multiply_work(context->result, context->A, context->B, context->start, context->end); + context->start = 0; + context->end = 0; + } + } + // No more work to do + + return NULL; +} + +bool multiply_parallel(ThreadPool *pool, Matrix *result, Matrix *A, Matrix *B) { + uint32_t size = result->height * result->width; + + uint32_t chunk = (size + (MAX_THREADS - 1)) / MAX_THREADS; + uint32_t allocated = 0; + uint32_t thread = 0; + + // Allocate the work + while (allocated < size) { + pool->context[thread]->result = result; + pool->context[thread]->A = A; + pool->context[thread]->B = B; + pool->context[thread]->start = allocated; + allocated += chunk; + if (allocated > size) { + allocated = size; + } + pool->context[thread]->end = allocated; + thread += 1; + } + + // Trigger the runners to work + pthread_mutex_lock(&pool->begin_mutex); + pool->working = MAX_THREADS; + pthread_cond_broadcast(&pool->begin_cond); + pthread_mutex_unlock(&pool->begin_mutex); + + // Wait for the work to complete + pthread_mutex_lock(&pool->working_mutex); + while (pool->working > 0) { + pthread_cond_wait(&pool->working_cond, &pool->working_mutex); + } + pthread_mutex_unlock(&pool->working_mutex); + + return true; +} +