Last active
January 31, 2024 16:13
-
-
Save Voldrix/5f48c9b729b9de1b36b465f4f9a0290a to your computer and use it in GitHub Desktop.
multithreaded signal driven IO without multiplexing template in C
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <string.h> | |
#include <unistd.h> | |
#include <pthread.h> | |
#include <netinet/in.h> | |
#include <sys/socket.h> | |
#include <signal.h> | |
#define THREADS 4 | |
#define QUEUE_DEPTH 32 //power of 2 | |
#define BUFFER_SIZE 2048 | |
#define SOCKET_PORT 8080 | |
sigset_t signal_mask; | |
int queue[THREADS * QUEUE_DEPTH] = {0}; | |
int qtail[THREADS] = {0}, qhead = 0, threadPtr = 0; | |
char buffers[THREADS * BUFFER_SIZE]; | |
const char *headers = "HTTP/1.0 200 OK\r\ncache-control: no-store\r\nAccess-Control-Allow-Origin: *\r\nConnection: close\r\ncontent-type: text/plain; charset=utf-8\r\ncontent-length: 0\r\n\r\n"; | |
void parse_request(int fd, char *buff) { | |
int retLen = strlen(headers); | |
memcpy(buff, headers, retLen); | |
write(fd, buff, retLen); | |
close(fd); | |
} | |
void* worker_thread(void* threadNo) { | |
int threadNum = (long long)threadNo; | |
int fd, bytesRead, sig_caught, *tail = &qtail[threadNum], *_queue = &queue[QUEUE_DEPTH * threadNum]; | |
char *buff = &buffers[BUFFER_SIZE * threadNum]; | |
while(1) { | |
if(_queue[*tail] == 0) //no new connection queued | |
sigwait(&signal_mask, &sig_caught); | |
fd = _queue[*tail]; | |
_queue[*tail] = 0; | |
*tail += 1; | |
*tail &= QUEUE_DEPTH - 1; //rollover | |
bytesRead = read(fd, buff, sizeof(buff) - 1); | |
if(bytesRead <= 0) //read error or EOF | |
continue; | |
buff[bytesRead] = 0; //null terminate str | |
parse_request(fd, buff); | |
} | |
return 0; | |
} | |
int main(void) { | |
daemon(0, 0); | |
sigemptyset(&signal_mask); | |
sigaddset(&signal_mask, SIGCONT); | |
sigprocmask(SIG_SETMASK, &signal_mask, NULL); //block SIGCONT (inherited by all threads) | |
//THREAD POOL | |
pthread_t thread[THREADS]; | |
for(long long coreOffset = 0; coreOffset < THREADS; coreOffset++) | |
pthread_create(&thread[coreOffset], NULL, &worker_thread, (void*)coreOffset); | |
//NETWORKING LESTENING SOCKET | |
int listen_sd, new_sd, opt = 1; | |
struct sockaddr_in address; | |
int addrlen = sizeof(address); | |
if((listen_sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) | |
return 1; | |
if(setsockopt(listen_sd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) | |
return 1; | |
memset(&address, 0, sizeof(address)); | |
address.sin_family = AF_INET; | |
address.sin_addr.s_addr = INADDR_ANY; | |
address.sin_port = htons(SOCKET_PORT); | |
if(bind(listen_sd, (struct sockaddr*)&address, sizeof(address)) < 0) | |
return 1; | |
if(listen(listen_sd, 32) < 0) | |
return 1; | |
//network listen loop | |
for(;;) { | |
new_sd = accept(listen_sd, (struct sockaddr*)&address, &addrlen); | |
if(new_sd == -1) continue; | |
queue[threadPtr * QUEUE_DEPTH + qhead] = new_sd; //add new conn to queue | |
if(qtail[threadPtr] == qhead) //if thread's queue is empty (thread is asleep), send signal | |
pthread_kill(thread[threadPtr], SIGCONT); | |
threadPtr += 1; | |
//rollover incremented ptrs | |
qhead += (threadPtr == THREADS) ? 1 : 0; | |
threadPtr = (threadPtr == THREADS) ? 0 : threadPtr; | |
qhead = (qhead == QUEUE_DEPTH) ? 0 : qhead; | |
} | |
shutdown(listen_sd, SHUT_RDWR); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment