Skip to content

Instantly share code, notes, and snippets.

@samvv
Created August 31, 2024 13:55
Show Gist options
  • Save samvv/ae0a367181a9ca6a6dae902e073ee7af to your computer and use it in GitHub Desktop.
Save samvv/ae0a367181a9ca6a6dae902e073ee7af to your computer and use it in GitHub Desktop.
Small thread-safe queue in C
#ifndef TSQ_H
#define TSQ_H
#include <stdlib.h>
#include <pthread.h>
#ifndef TSQ_MAX_SIZE
#define TSQ_MAX_SIZE 1024
#endif
struct tsq {
void** buffer;
size_t capacity;
size_t front;
size_t rear;
size_t size;
pthread_mutex_t mutex;
pthread_cond_t not_full;
pthread_cond_t not_empty;
};
inline void tsq_init(struct tsq* out) {
out->buffer = (void**)malloc(TSQ_MAX_SIZE * sizeof(void*));
out->capacity = TSQ_MAX_SIZE;
out->front = 0;
out->rear = 0;
out->size = 0;
out->mutex = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER;
out->not_full = (pthread_cond_t)PTHREAD_COND_INITIALIZER;
out->not_empty = (pthread_cond_t)PTHREAD_COND_INITIALIZER;
}
inline size_t tsq_count_dangerous(struct tsq* queue) {
size_t min = queue->front;
size_t max = queue->rear;
return min < max ? max - min : min - max;
}
inline void* tsq_pop_front(struct tsq* deque) {
pthread_mutex_lock(&deque->mutex);
while (deque->size == 0) {
pthread_cond_wait(&deque->not_empty, &deque->mutex);
}
void *value = deque->buffer[deque->front];
deque->front = (deque->front + 1) % deque->capacity;
deque->size--;
pthread_cond_signal(&deque->not_full);
pthread_mutex_unlock(&deque->mutex);
return value;
}
inline void* tsq_pop_back(struct tsq* deque) {
pthread_mutex_lock(&deque->mutex);
while (deque->size == 0) {
pthread_cond_wait(&deque->not_empty, &deque->mutex);
}
deque->rear = (deque->rear == 0) ? deque->capacity - 1 : deque->rear - 1;
void *value = deque->buffer[deque->rear];
deque->size--;
pthread_cond_signal(&deque->not_full);
pthread_mutex_unlock(&deque->mutex);
return value;
}
inline int tsq_push_front(struct tsq* deque, void *value) {
pthread_mutex_lock(&deque->mutex);
while (deque->size == deque->capacity) {
pthread_cond_wait(&deque->not_full, &deque->mutex);
}
deque->front = (deque->front == 0) ? deque->capacity - 1 : deque->front - 1;
deque->buffer[deque->front] = value;
deque->size++;
pthread_cond_signal(&deque->not_empty);
pthread_mutex_unlock(&deque->mutex);
return 0;
}
inline int tsq_push_back(struct tsq *deque, void *value) {
pthread_mutex_lock(&deque->mutex);
while (deque->size == deque->capacity) {
pthread_cond_wait(&deque->not_full, &deque->mutex);
}
deque->buffer[deque->rear] = value;
deque->rear = (deque->rear + 1) % deque->capacity;
deque->size++;
pthread_cond_signal(&deque->not_empty);
pthread_mutex_unlock(&deque->mutex);
return 0;
}
inline void tsq_destroy(struct tsq* queue) {
free(queue->buffer);
pthread_mutex_destroy(&queue->mutex);
pthread_cond_destroy(&queue->not_empty);
}
#endif /* of TSQ_H */
#include "gtest/gtest.h"
#include <cstdio>
#include <unistd.h>
#define TSQ_MAX_SIZE 5
#include "tsq.h"
#include <iostream>
#include <thread>
TEST(TsqQueue, PushPopSingleThread) {
struct tsq t;
tsq_init(&t);
std::thread popper([&] {
ASSERT_EQ((ptrdiff_t)tsq_pop_back(&t), 3);
ASSERT_EQ((ptrdiff_t)tsq_pop_back(&t), 2);
ASSERT_EQ((ptrdiff_t)tsq_pop_back(&t), 1);
});
tsq_push_back(&t, (void*)1);
tsq_push_back(&t, (void*)2);
tsq_push_back(&t, (void*)3);
popper.join();
}
TEST(TsqQueue, PushPopOverflow) {
struct tsq t;
tsq_init(&t);
std::thread popper([&] {
ASSERT_EQ((ptrdiff_t)tsq_pop_front(&t), 1);
ASSERT_EQ((ptrdiff_t)tsq_pop_front(&t), 2);
ASSERT_EQ((ptrdiff_t)tsq_pop_front(&t), 3);
ASSERT_EQ((ptrdiff_t)tsq_pop_front(&t), 4);
ASSERT_EQ((ptrdiff_t)tsq_pop_front(&t), 5);
ASSERT_EQ((ptrdiff_t)tsq_pop_front(&t), 6);
});
tsq_push_back(&t, (void*)1);
tsq_push_back(&t, (void*)2);
tsq_push_back(&t, (void*)3);
tsq_push_back(&t, (void*)4);
tsq_push_back(&t, (void*)5);
tsq_push_back(&t, (void*)6);
popper.join();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment