Skip to content

Instantly share code, notes, and snippets.

@victoroliveirab
Last active November 22, 2022 10:59
Show Gist options
  • Save victoroliveirab/2ecdba1525dec887329ba4bfd73dd253 to your computer and use it in GitHub Desktop.
Save victoroliveirab/2ecdba1525dec887329ba4bfd73dd253 to your computer and use it in GitHub Desktop.
Threadpool with pthreads, semaphore and mutex lock
/**
* Example client program that uses thread pool.
*/
#include <stdio.h>
#include <unistd.h>
#include "threadpool.h"
struct data
{
int a;
int b;
};
void add(void *param)
{
struct data *temp;
temp = (struct data*)param;
printf("I add two values %d and %d result = %d\n",temp->a, temp->b, temp->a + temp->b);
}
int main(void)
{
// create some work to do
struct data work0;
work0.a = 5;
work0.b = 10;
struct data work1;
work1.a = 15;
work1.b = 110;
struct data work2;
work2.a = 55;
work2.b = 150;
struct data work3;
work3.a = 155;
work3.b = 1105;
struct data work4;
work4.a = 25;
work4.b = 210;
struct data work5;
work5.a = 158;
work5.b = 1108;
struct data work6;
work6.a = 9;
work6.b = 99;
struct data work7;
work7.a = 1;
work7.b = 11;
// initialize the thread pool
pool_init();
// submit the work to the queue
pool_submit(&add,&work0);
pool_submit(&add,&work1);
pool_submit(&add,&work2);
pool_submit(&add,&work3);
pool_submit(&add,&work4);
pool_submit(&add,&work5);
pool_submit(&add,&work6);
pool_submit(&add,&work7);
// may be helpful
// sleep(1);
pool_shutdown();
return 0;
}
# makefile for thread pool
#
CC=gcc
CFLAGS=-Wall
PTHREADS=-lpthread
all: client.o threadpool.o
$(CC) $(CFLAGS) -o test client.o threadpool.o $(PTHREADS)
client.o: client.c
$(CC) $(CFLAGS) -c client.c $(PTHREADS)
threadpool.o: threadpool.c threadpool.h
$(CC) $(CFLAGS) -c threadpool.c $(PTHREADS)
clean:
rm -rf *.o
rm -rf test
/**
* Implementation of thread pool.
*/
#include <pthread.h>
#include <stdlib.h>
#include <stdio.h>
#include <semaphore.h>
#include "threadpool.h"
#define QUEUE_SIZE 10 // not used in this code
#define NUMBER_OF_THREADS 3
#define TRUE 1
#define DEBUG 1 // 0 - no debug message, 1 - main points, 2 - all
#define LIFE 2 // amount of seconds threads will live
struct task worktodo;
struct threadpool *pool;
time_t begin;
pthread_t bee; // the worker bee
int i = 0; // task_id debugging
int j = 0; // thread_id debugging
// insert a task into the queue
int enqueue(struct task *t)
{
if (pool->head == NULL) { // pool is currently empty
if (DEBUG > 1) printf("Pool is currently empty...\n");
pool->head = t;
pool->tail = t;
} else {
if (DEBUG > 1) printf("Pool isn't currently empty...\n");
pool->tail->next = t;
pool->tail = t;
}
if (DEBUG > 1) printf("Task with id = %d added...\n", t->task_id);
sem_post(&pool->semaphore);
return 0;
}
// remove a task from the queue
struct task* dequeue()
{
struct task *worktodo;
worktodo = pool->head;
if (worktodo == NULL) { // dequeue was required in an empty thread
return NULL;
}
if (worktodo->next == NULL) { // dequeue made pool empty
pool->head = NULL;
pool->tail = NULL;
} else {
pool->head = worktodo->next;
}
return worktodo;
}
// the worker thread in the thread pool
void *worker(void *param)
{
int thread_id = (int) param;
if (DEBUG) printf("Start work of thread %u...\n", thread_id);
struct task *task;
int num_of_works = 0;
time_t now;
while (TRUE) {
now = time(NULL);
if (now - begin > pool->timespan) break; // stop condition
sem_wait(&pool->semaphore);
pthread_mutex_lock(&pool->mutex);
if (DEBUG && pool->head) printf(".locking in thread %u...\n", thread_id);
if (pool->head == NULL) {
pthread_mutex_unlock(&pool->mutex);
} else {
task = dequeue();
++num_of_works;
if (DEBUG > 1) printf("Thread %u got job of adding %u + %u...\n", thread_id, *((int*) task->data), *((int*) task->data + 1));
pthread_mutex_unlock(&pool->mutex);
if (DEBUG) printf(".unlocking in thread %u...\n", thread_id);
execute(task->function, task->data);
}
sem_post(&pool->semaphore);
}
if (DEBUG) printf("Closing thread %u, that performed %u works...\n",thread_id, num_of_works);
pthread_exit(0);
}
/**
* Executes the task provided to the thread pool
*/
void execute(void (*somefunction)(void *p), void *p)
{
(*somefunction)(p);
}
/**
* Submits work to the pool.
*/
int pool_submit(void (*somefunction)(void *p), void *p)
{
pthread_mutex_lock(&pool->mutex);
if (DEBUG > 1) printf("New task in the pool... Locking\n");
struct task *worktodo = (struct task*) malloc(sizeof(struct task));
worktodo->function = somefunction;
worktodo->data = p;
worktodo->task_id = i;
worktodo->next = NULL;
++i;
if (DEBUG > 1) {
printf("work.a = %u ... ", *((int*) worktodo->data));
printf("work.b = %u\n", *((int*) worktodo->data + 1));
//printf(">>> *((int*) worktodo.data) == worktodo.b = %u\n", *((int*) (worktodo.data + sizeof(int))));
}
enqueue(worktodo);
if (DEBUG > 1) printf("Enqueued. Unlocking execution...\n");
pthread_mutex_unlock(&pool->mutex);
return 0;
}
// initialize the thread pool
void pool_init(void)
{
pool = (struct threadpool*) malloc(sizeof(struct threadpool));
pool->head = NULL;
pool->tail = NULL;
pool->timespan = LIFE; //seconds
pthread_mutex_init(&pool->mutex, NULL);
sem_init(&pool->semaphore, 0, NUMBER_OF_THREADS);
begin = time(NULL);
for (int i = 0; i < NUMBER_OF_THREADS; ++i, ++j) {
pthread_create(&bee, NULL, worker, (void *) j);
}
}
// shutdown the thread pool
void pool_shutdown(void)
{
pthread_join(bee,NULL);
if (DEBUG) printf("End of execution :)\n");
}
#include <pthread.h>
#include <semaphore.h>
// function prototypes
void execute(void (*somefunction)(void *p), void *p);
int pool_submit(void (*somefunction)(void *p), void *p);
void *worker(void *param);
void pool_init(void);
void pool_shutdown(void);
struct task
{
void (*function)(void *p);
void *data;
struct task *next;
int task_id; //debug purpose
};
struct threadpool
{
struct task *head;
struct task *tail;
pthread_mutex_t mutex;
sem_t semaphore;
int timespan;
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment