utilitiec/src/ThreadPool/ThreadPool.c
2024-06-13 15:28:21 +02:00

412 lines
9.8 KiB
C

#include "ThreadPool.h"
#include <pthread.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
static void _ThreadPoolWorker_StoreResult(ThreadPool* pool, size_t job_id, void* result)
{
while (OSMutex_Acquire(&pool->rw_lock) != EXIT_SUCCESS);
ThreadPoolJob* job = FreeList_GetPointer(&pool->job_storage, job_id);
if (job->flags & THREADPOOLJOB_DISCARD_RESULT) {
FreeList_Free(&pool->job_storage, job_id);
OSMutex_Release(&pool->rw_lock);
return;
}
// capture result
job->result = result;
if (pool->first_result == SIZE_MAX) {
// Initiate linked list stuff
pool->first_result = job_id;
pool->last_result = job_id;
} else {
// Retrieve last result
ThreadPoolJob* last_result = FreeList_GetPointer(
&pool->job_storage,
pool->last_result
);
// Append to result linked list
last_result->next = job_id;
pool->last_result = job_id;
}
OSMutex_Release(&pool->rw_lock);
return;
}
static void* ThreadPoolWorker_Main(ThreadPool* pool)
{
#if defined(__unix__) || (defined(__APPLE__) && defined(__MACH__))
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
#endif
while (true) {
// wait until job is available
int wait_code = OSSemaphore_Wait(&pool->wait_queue);
if (wait_code != EXIT_SUCCESS) {
// Can't do anything about it, try again
continue;
}
// Get ThreadPool lock for rw
int acquire_code = OSMutex_Acquire(&pool->rw_lock);
if (acquire_code != EXIT_SUCCESS) {
// Unlocking failed, loop again
OSSemaphore_Release(&pool->wait_queue);
continue;
}
// Grab first job
size_t job_id = pool->first_job;
ThreadPoolJob* job = FreeList_GetPointer(&pool->job_storage, job_id);
// Make next job available
pool->first_job = job->next;
if (job->next == SIZE_MAX) pool->last_job = SIZE_MAX;
job->next = SIZE_MAX;
// Copy the function and the argument to leave the critical section
ThreadFunction job_function = job->job;
void* job_argument = job->arg;
// unlock again
OSMutex_Release(&pool->rw_lock);
// perform job
void* result = job_function(job_argument);
_ThreadPoolWorker_StoreResult(pool, job_id, result);
}
return NULL;
}
static int _ThreadPool_SpawnWorker(ThreadPool* pool, ThreadPoolWorker* worker)
{
int create_code = OSThread_Create(
&worker->thread,
(ThreadFunction) ThreadPoolWorker_Main,
pool
);
if (create_code != EXIT_SUCCESS) {
return EXIT_FAILURE;
}
worker->alive = true;
worker->job = SIZE_MAX;
return EXIT_SUCCESS;
}
static size_t _ThreadPool_CountAliveWorkers(ThreadPool* pool)
{
size_t sum = 0;
for (size_t i = 0; i < pool->worker_count; i++) {
sum += pool->workers[i].alive == true;
}
return sum;
}
int ThreadPool_Create(ThreadPool* target, size_t worker_count, int8_t flags, allocator_t* allocator)
{
if (target == NULL) return EDESTADDRREQ;
if (worker_count == 0) return EINVAL;
int return_code = EXIT_SUCCESS;
target->flags = flags;
int freelist_code = FreeList_Create(&target->job_storage, sizeof(ThreadPoolJob), worker_count * 8, allocator);
if (freelist_code != EXIT_SUCCESS) {
return ENOMEM;
}
int semaphore_code = OSSemaphore_Create(&target->wait_queue, 0U);
if (semaphore_code != EXIT_SUCCESS) {
return_code = ENOMEM;
goto defer_freelist;
}
int mutex_code = OSMutex_Create(&target->rw_lock);
if (mutex_code != EXIT_SUCCESS) {
return_code = ENOMEM;
goto defer_semaphore;
}
ThreadPoolWorker* worker_array = Allocator_AllocateArray(
allocator,
worker_count,
sizeof(ThreadPoolWorker)
);
if (worker_array == NULL) {
return_code = ENOMEM;
goto defer_mutex;
}
target->workers = worker_array;
target->worker_count = worker_count;
for (size_t i = 0; i < worker_count; i++) {
if (flags & THREADPOOL_KILL_WORKERS) {
worker_array[i].alive = false;
} else {
int thread_code = _ThreadPool_SpawnWorker(
target,
worker_array + i
);
if (thread_code) {
return_code = EXIT_FAILURE;
goto defer_threads;
}
}
}
target->first_job = SIZE_MAX;
target->first_result = SIZE_MAX;
target->last_job = SIZE_MAX;
target->last_result = SIZE_MAX;
target->allocator = allocator;
return EXIT_SUCCESS;
defer_threads:
;size_t i = 0;
while (i < worker_count && worker_array[i].alive) {
OSThread_Kill(&worker_array[i].thread);
i++;
}
Allocator_FreeArray(
allocator,
worker_array,
sizeof(ThreadPoolWorker),
worker_count);
defer_mutex:
OSMutex_Destroy(&target->rw_lock);
defer_semaphore:
OSSemaphore_Destroy(&target->wait_queue);
defer_freelist:
FreeList_Destroy(&target->job_storage);
return return_code;
}
int ThreadPool_QueueJob(ThreadPool* pool, ThreadPoolJob* job, size_t* job_id_out)
{
if (pool == NULL) return EINVAL;
if (job == NULL) return EINVAL;
int return_code = EXIT_SUCCESS;
int acquire_code = OSMutex_Acquire(&pool->rw_lock);
if (acquire_code != EXIT_SUCCESS) return ECANCELED;
size_t job_id;
if (FreeList_Allocate(&pool->job_storage, &job_id) != EXIT_SUCCESS) {
return_code = ENOMEM;
goto defer_mutex;
}
ThreadPoolJob* pjob = FreeList_GetPointer(&pool->job_storage, job_id);
memcpy(pjob, job, sizeof(*job));
pjob->next = SIZE_MAX;
if (pool->last_job != SIZE_MAX) {
ThreadPoolJob* last_job = FreeList_GetPointer(
&pool->job_storage,
pool->last_job
);
last_job->next = job_id;
pool->last_job = job_id;
} else {
pool->last_job = job_id;
pool->first_job = job_id;
}
if (pool->flags & THREADPOOL_KILL_WORKERS) {
// Spawn a new worker, if possible
for (size_t i = 0; i < pool->worker_count; i++) {
if (pool->workers[i].alive) continue;
int create_code = _ThreadPool_SpawnWorker(
pool,
pool->workers + i
);
if (create_code != EXIT_SUCCESS
&& _ThreadPool_CountAliveWorkers(pool) == 0) {
return EXIT_FAILURE;
} else if (create_code != EXIT_SUCCESS) {
// Fail silently, there are still workers alive
break;
} else {
// create_code == EXIT_SUCCESS
// Successfully spawned worker for job
break;
}
}
}
OSSemaphore_Release(&pool->wait_queue);
// This is thread-safe, i promise
if (job_id_out != NULL) job_id_out[0] = job_id;
defer_mutex:
OSMutex_Release(&pool->rw_lock);
return return_code;
}
int ThreadPool_UnqueueJob(ThreadPool* pool, size_t job_id)
{
if (pool == NULL) return EINVAL;
if (job_id >= pool->job_storage.list.capacity) return EINVAL;
int exit_code = EINPROGRESS;
OSMutex_Acquire(&pool->rw_lock);
if (pool->first_job == job_id) {
ThreadPoolJob* target_job = FreeList_GetPointer(&pool->job_storage, job_id);
pool->first_job = target_job->next;
if (pool->first_job == SIZE_MAX) pool->last_job = SIZE_MAX;
exit_code = EXIT_SUCCESS;
} else {
ThreadPoolJob* current_job = FreeList_GetPointer(&pool->job_storage, pool->first_job);
while (current_job->next != SIZE_MAX) {
ThreadPoolJob* next = FreeList_GetPointer(&pool->job_storage, current_job->next);
if (current_job->next == job_id) {
current_job->next = next->next;
FreeList_Free(&pool->job_storage, job_id);
exit_code = EXIT_SUCCESS;
break;
}
current_job = next;
}
}
OSMutex_Release(&pool->rw_lock);
return exit_code;
}
bool ThreadPool_HasFinished(ThreadPool* pool, size_t job_id)
{
if (pool == NULL) return false;
if (job_id >= pool->job_storage.list.capacity) return false;
OSMutex_Acquire(&pool->rw_lock);
size_t current = pool->first_result;
while (current != job_id && current != SIZE_MAX) {
ThreadPoolJob* current_result = FreeList_GetPointer(&pool->job_storage, current);
current = current_result->next;
}
OSMutex_Release(&pool->rw_lock);
return current == job_id && current != SIZE_MAX;
}
int ThreadPool_GetJobResult(ThreadPool* pool, size_t job_id, void** job_result)
{
if (pool == NULL) return false;
if (job_id >= pool->job_storage.list.capacity) return false;
if (job_result == NULL) return EDESTADDRREQ;
if (OSMutex_Acquire(&pool->rw_lock) != EXIT_SUCCESS) return ECANCELED;
if (pool->first_result == job_id) {
ThreadPoolJob* job = FreeList_GetPointer(&pool->job_storage, job_id);
// store return
job_result[0] = job->result;
// Update linked list
pool->first_result = job->next;
if (pool->first_result == SIZE_MAX) pool->last_result = SIZE_MAX;
} else {
size_t current_id = pool->first_result;
ThreadPoolJob* current_result = FreeList_GetPointer(&pool->job_storage, current_id);
while (current_result->next != job_id && current_result->next != SIZE_MAX) {
current_id = current_result->next;
current_result = FreeList_GetPointer(&pool->job_storage, current_id);
}
if (current_result->next == job_id) {
ThreadPoolJob* job = FreeList_GetPointer(&pool->job_storage, job_id);
job_result[0] = job->result;
current_result->next = job->next;
FreeList_Free(&pool->job_storage, job_id);
if (job_id == pool->last_result) pool->last_result = current_id;
}
}
OSMutex_Release(&pool->rw_lock);
// I don't want to implement error checking here, it is stated in the documentation that you have to call HasFinished() beforehand
return EXIT_SUCCESS;
}
size_t ThreadPool_GetNextResultID(ThreadPool* pool)
{
if (OSMutex_Acquire(&pool->rw_lock) != EXIT_SUCCESS) return SIZE_MAX;
size_t next_result_id = pool->first_result;
OSMutex_Release(&pool->rw_lock);
return next_result_id;
}
void ThreadPool_Destroy(ThreadPool* thread_pool)
{
OSMutex_Acquire(&thread_pool->rw_lock);
FreeList_Destroy(&thread_pool->job_storage);
size_t i = 0;
while (i < thread_pool->worker_count && thread_pool->workers[i].alive) {
void* r;
OSThread_Kill(&thread_pool->workers[i].thread);
OSThread_Join(&thread_pool->workers[i].thread, &r);
OSThread_Destroy(&thread_pool->workers[i].thread);
i++;
}
Allocator_FreeArray(
thread_pool->allocator,
thread_pool->workers,
sizeof(ThreadPoolWorker),
thread_pool->worker_count
);
OSMutex_Release(&thread_pool->rw_lock);
OSMutex_Destroy(&thread_pool->rw_lock);
OSSemaphore_Destroy(&thread_pool->wait_queue);
return;
}