Created
August 31, 2024 13:55
-
-
Save samvv/ae0a367181a9ca6a6dae902e073ee7af to your computer and use it in GitHub Desktop.
Small thread-safe queue in C
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#ifndef TSQ_H | |
#define TSQ_H | |
#include <stdlib.h> | |
#include <pthread.h> | |
#ifndef TSQ_MAX_SIZE | |
#define TSQ_MAX_SIZE 1024 | |
#endif | |
struct tsq { | |
void** buffer; | |
size_t capacity; | |
size_t front; | |
size_t rear; | |
size_t size; | |
pthread_mutex_t mutex; | |
pthread_cond_t not_full; | |
pthread_cond_t not_empty; | |
}; | |
inline void tsq_init(struct tsq* out) { | |
out->buffer = (void**)malloc(TSQ_MAX_SIZE * sizeof(void*)); | |
out->capacity = TSQ_MAX_SIZE; | |
out->front = 0; | |
out->rear = 0; | |
out->size = 0; | |
out->mutex = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER; | |
out->not_full = (pthread_cond_t)PTHREAD_COND_INITIALIZER; | |
out->not_empty = (pthread_cond_t)PTHREAD_COND_INITIALIZER; | |
} | |
inline size_t tsq_count_dangerous(struct tsq* queue) { | |
size_t min = queue->front; | |
size_t max = queue->rear; | |
return min < max ? max - min : min - max; | |
} | |
inline void* tsq_pop_front(struct tsq* deque) { | |
pthread_mutex_lock(&deque->mutex); | |
while (deque->size == 0) { | |
pthread_cond_wait(&deque->not_empty, &deque->mutex); | |
} | |
void *value = deque->buffer[deque->front]; | |
deque->front = (deque->front + 1) % deque->capacity; | |
deque->size--; | |
pthread_cond_signal(&deque->not_full); | |
pthread_mutex_unlock(&deque->mutex); | |
return value; | |
} | |
inline void* tsq_pop_back(struct tsq* deque) { | |
pthread_mutex_lock(&deque->mutex); | |
while (deque->size == 0) { | |
pthread_cond_wait(&deque->not_empty, &deque->mutex); | |
} | |
deque->rear = (deque->rear == 0) ? deque->capacity - 1 : deque->rear - 1; | |
void *value = deque->buffer[deque->rear]; | |
deque->size--; | |
pthread_cond_signal(&deque->not_full); | |
pthread_mutex_unlock(&deque->mutex); | |
return value; | |
} | |
inline int tsq_push_front(struct tsq* deque, void *value) { | |
pthread_mutex_lock(&deque->mutex); | |
while (deque->size == deque->capacity) { | |
pthread_cond_wait(&deque->not_full, &deque->mutex); | |
} | |
deque->front = (deque->front == 0) ? deque->capacity - 1 : deque->front - 1; | |
deque->buffer[deque->front] = value; | |
deque->size++; | |
pthread_cond_signal(&deque->not_empty); | |
pthread_mutex_unlock(&deque->mutex); | |
return 0; | |
} | |
inline int tsq_push_back(struct tsq *deque, void *value) { | |
pthread_mutex_lock(&deque->mutex); | |
while (deque->size == deque->capacity) { | |
pthread_cond_wait(&deque->not_full, &deque->mutex); | |
} | |
deque->buffer[deque->rear] = value; | |
deque->rear = (deque->rear + 1) % deque->capacity; | |
deque->size++; | |
pthread_cond_signal(&deque->not_empty); | |
pthread_mutex_unlock(&deque->mutex); | |
return 0; | |
} | |
inline void tsq_destroy(struct tsq* queue) { | |
free(queue->buffer); | |
pthread_mutex_destroy(&queue->mutex); | |
pthread_cond_destroy(&queue->not_empty); | |
} | |
#endif /* of TSQ_H */ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include "gtest/gtest.h" | |
#include <cstdio> | |
#include <unistd.h> | |
#define TSQ_MAX_SIZE 5 | |
#include "tsq.h" | |
#include <iostream> | |
#include <thread> | |
TEST(TsqQueue, PushPopSingleThread) { | |
struct tsq t; | |
tsq_init(&t); | |
std::thread popper([&] { | |
ASSERT_EQ((ptrdiff_t)tsq_pop_back(&t), 3); | |
ASSERT_EQ((ptrdiff_t)tsq_pop_back(&t), 2); | |
ASSERT_EQ((ptrdiff_t)tsq_pop_back(&t), 1); | |
}); | |
tsq_push_back(&t, (void*)1); | |
tsq_push_back(&t, (void*)2); | |
tsq_push_back(&t, (void*)3); | |
popper.join(); | |
} | |
TEST(TsqQueue, PushPopOverflow) { | |
struct tsq t; | |
tsq_init(&t); | |
std::thread popper([&] { | |
ASSERT_EQ((ptrdiff_t)tsq_pop_front(&t), 1); | |
ASSERT_EQ((ptrdiff_t)tsq_pop_front(&t), 2); | |
ASSERT_EQ((ptrdiff_t)tsq_pop_front(&t), 3); | |
ASSERT_EQ((ptrdiff_t)tsq_pop_front(&t), 4); | |
ASSERT_EQ((ptrdiff_t)tsq_pop_front(&t), 5); | |
ASSERT_EQ((ptrdiff_t)tsq_pop_front(&t), 6); | |
}); | |
tsq_push_back(&t, (void*)1); | |
tsq_push_back(&t, (void*)2); | |
tsq_push_back(&t, (void*)3); | |
tsq_push_back(&t, (void*)4); | |
tsq_push_back(&t, (void*)5); | |
tsq_push_back(&t, (void*)6); | |
popper.join(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment