Created
April 10, 2024 15:42
-
-
Save apconole/5d9985eeab495455c0e6c9c08e017086 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* Copyright (C) 2024 - Red Hat, Inc. | |
* Author: Aaron Conole <aconole@redhat.com> | |
* Licensed under GPLv2 | |
*/ | |
#ifndef _GNU_SOURCE | |
#define _GNU_SOURCE | |
#endif | |
#include <errno.h> | |
#include <fcntl.h> | |
#include <getopt.h> | |
#include <netinet/in.h> | |
#include <poll.h> | |
#include <pthread.h> | |
#include <sched.h> | |
#include <string.h> | |
#include <stdbool.h> | |
#include <stdio.h> | |
#include <stdlib.h> | |
#include <sys/epoll.h> | |
#include <sys/ioctl.h> | |
#include <sys/types.h> | |
#include <sys/socket.h> | |
#include <time.h> | |
#include <unistd.h> | |
#define BUCKETS 16 | |
#define MAX_EVENTS 16 | |
enum polling_mechanism { | |
PM_USE_READ_NB, | |
PM_USE_SELECT, | |
PM_USE_POLL, | |
PM_USE_EPOLL, | |
}; | |
struct bucket { | |
uint64_t frame_size; | |
uint64_t bytes; | |
uint64_t last_time_rx; | |
}; | |
struct thread_data { | |
struct bucket buckets[BUCKETS]; | |
int16_t verbose; | |
uint16_t port; | |
uint64_t end_time; | |
uint8_t core_idx; | |
enum polling_mechanism poll_type; | |
pthread_t thread_handle; | |
}; | |
static struct thread_data *byte_ctrs; | |
#define NSEC_PER_USEC 1000 | |
#define MS_PER_SEC 1000 | |
#define US_PER_MS 1000 | |
static int __nsleep(const struct timespec *req, struct timespec *rem) | |
{ | |
return clock_nanosleep(CLOCK_MONOTONIC, 0, req, rem); | |
} | |
static void pend_us(uint32_t usec) | |
{ | |
struct timespec req={0},rem={0}; | |
time_t sec = (int)(usec / (US_PER_MS * MS_PER_SEC)); | |
usec = usec - (sec * (US_PER_MS * MS_PER_SEC)); | |
req.tv_sec = sec; | |
req.tv_nsec = usec * NSEC_PER_USEC; | |
(void)__nsleep(&req, &rem); | |
} | |
static void pend_s(uint32_t sec) { | |
pend_us(sec * US_PER_MS * MS_PER_SEC); | |
} | |
static uint64_t time_ticks_ms() | |
{ | |
uint64_t value = 0; | |
struct timespec currentTime; | |
if (clock_gettime(CLOCK_MONOTONIC, ¤tTime)) { | |
return 0; /* 0 ticks... uh-oh */ | |
} | |
value = currentTime.tv_sec * MS_PER_SEC; | |
value += currentTime.tv_nsec / (NSEC_PER_USEC * US_PER_MS); | |
return value; | |
} | |
static size_t *bucket_sizes; | |
static size_t num_buckets; | |
static int get_bucket(uint64_t length) | |
{ | |
for (size_t i = 0; i < num_buckets && i < BUCKETS; ++i) { | |
if (length <= bucket_sizes[i]) { | |
return i; | |
} | |
} | |
return num_buckets - 1; | |
} | |
#define GBIT (1ULL << 30) | |
#define MBIT (1ULL << 20) | |
#define KBIT (1ULL << 10) | |
static const char *get_human_rate(uint64_t bytes, uint64_t seconds) | |
{ | |
static char human_rate[80] = {}; | |
double rate_bps = 8.0 * (double) bytes / (double) seconds; | |
char *unit; | |
if (rate_bps >= GBIT) { | |
rate_bps = rate_bps / GBIT; | |
unit = "Gbps"; | |
} else if (rate_bps >= MBIT) { | |
rate_bps = rate_bps / MBIT; | |
unit = "Mbps"; | |
} else if (rate_bps >= KBIT) { | |
rate_bps = rate_bps / KBIT; | |
unit = "Kbps"; | |
} else { | |
unit = "Bps"; | |
} | |
snprintf(human_rate, 80, "%.2lf %s", rate_bps, unit); | |
return human_rate; | |
} | |
static void report(struct bucket *buckets, int verb, uint64_t start_time_ms) | |
{ | |
uint64_t elapsed_time = (time_ticks_ms() - start_time_ms) / MS_PER_SEC; | |
uint64_t total_bw = 0; | |
if (verb < 0) return; /* skip reporting when verb is quiet */ | |
for (size_t i = 0; i < num_buckets; i++) { | |
total_bw += buckets[i].bytes; | |
if (buckets[i].bytes || verb > 0) | |
printf("Bucket %lu(%lu): %lu bytes in %lus (rate: %f bits/s)\n", | |
i, bucket_sizes[i], buckets[i].bytes, elapsed_time, | |
8 * (double)buckets[i].bytes / (double)elapsed_time); | |
} | |
if (verb > 0) | |
printf("+++ Total bw: %lu bytes in %lus = %s (rate %f bits/s)\n", | |
total_bw, elapsed_time, get_human_rate(total_bw, elapsed_time), | |
8.0 * (double) total_bw / (double) elapsed_time); | |
} | |
static int prepare_socket(int socket, enum polling_mechanism pm) | |
{ | |
int flags = fcntl(socket, F_GETFL, 0); | |
if (flags == -1) { | |
return -1; | |
} | |
/* For select, we won't use a non-blocking socket because it will just | |
* always return 'true'. There is still an ioctl check for # bytes, but | |
* this might exercise the select method more. */ | |
if (pm == PM_USE_SELECT) | |
return 0; | |
if (fcntl(socket, F_SETFL, flags | O_NONBLOCK) == -1) { | |
return -1; | |
} | |
return 0; | |
} | |
static int check_nb_socket(int socket) | |
{ | |
char buf; | |
ssize_t bytes_read = recv(socket, &buf, 1, MSG_PEEK | MSG_DONTWAIT); | |
if (bytes_read == -1) { | |
if (errno == EAGAIN || errno == EWOULDBLOCK) { | |
return 0; | |
} else { | |
return -1; | |
} | |
} | |
return 1; | |
} | |
static int epoll_udp_result(int epoll_fd) | |
{ | |
struct epoll_event event; | |
int nfds; | |
if (epoll_fd != -1) { | |
nfds = epoll_wait(epoll_fd, &event, 1, 500); | |
if (nfds <= 0) | |
return nfds; | |
if (event.events & EPOLLIN) { | |
return 1; | |
} | |
} | |
return 0; | |
} | |
static int poll_udp_result(int socket) | |
{ | |
struct pollfd fds[1]; | |
int nfds; | |
fds[0].fd = socket; | |
fds[0].events = POLLIN; | |
nfds = poll(fds, 1, 500); | |
if (nfds <= 0) | |
return nfds; | |
if (fds[0].revents & POLLIN) | |
return 1; | |
return 0; | |
} | |
static int select_udp_result(int socket) | |
{ | |
struct timeval tv; | |
fd_set fds; | |
unsigned int bytes = 0; | |
tv.tv_sec = 0; | |
tv.tv_usec = 500 * US_PER_MS; | |
FD_ZERO(&fds); | |
FD_SET(socket, &fds); | |
if (select(socket + 1, &fds, NULL, NULL, &tv) > 0) { | |
/* check if recv would return bytes */ | |
if (ioctl(socket, FIONREAD, &bytes) < 0) { | |
return -1; | |
} | |
return bytes > 0; | |
} | |
return 0; | |
} | |
static int ready_to_read(int socket, int epoll_fd, | |
enum polling_mechanism pm) | |
{ | |
/* Each case will fallthrough on 'failure' */ | |
switch (pm) { | |
case PM_USE_EPOLL: | |
if (epoll_udp_result(epoll_fd) > 0) | |
return 1; | |
/* fallthrough */ | |
case PM_USE_POLL: | |
if (poll_udp_result(socket) > 0) | |
return 1; | |
/* fallthrough */ | |
case PM_USE_SELECT: | |
if (select_udp_result(socket) > 0) | |
return 1; | |
/* fallthrough */ | |
case PM_USE_READ_NB: | |
/* fallthrough */ | |
default: | |
return check_nb_socket(socket); | |
} | |
} | |
static int add_to_epoll(int epoll_fd, int socket) { | |
struct epoll_event event; | |
event.events = EPOLLIN | EPOLLET; | |
event.data.fd = socket; | |
return epoll_ctl(epoll_fd, EPOLL_CTL_ADD, socket, &event); | |
} | |
static void *udp_listener(void *arg) { | |
struct thread_data *td = (struct thread_data *)arg; | |
int optval = 1, udp_socket, res; | |
int epollfd = -1; | |
struct sockaddr_in sa; | |
if (td->core_idx) { | |
pid_t curr = gettid(); | |
cpu_set_t cpus; | |
CPU_ZERO(&cpus); | |
CPU_SET(td->core_idx, &cpus); | |
sched_setaffinity(curr, sizeof(cpus), &cpus); | |
} | |
thr_restart: | |
udp_socket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); | |
if (udp_socket < 0) { | |
printf("THREAD-FATAL: Failed to allocate socket (%s).\n", | |
strerror(errno)); | |
return NULL; | |
} | |
if (setsockopt(udp_socket, SOL_SOCKET, SO_REUSEPORT, &optval, | |
sizeof(optval)) < 0) { | |
printf("THREAD-FATAL: Failed to setsockopt (%s).\n", strerror(errno)); | |
close(udp_socket); | |
return NULL; | |
} | |
if (prepare_socket(udp_socket, td->poll_type) < 0) { | |
printf("THREAD-FATAL: Failed to prepare: %s", strerror(errno)); | |
close(udp_socket); | |
return NULL; | |
} | |
/* XXX: Add a v6 option here as well. */ | |
memset(&sa, 0, sizeof sa); | |
sa.sin_family = AF_INET; | |
sa.sin_port = htons(td->port); | |
sa.sin_addr.s_addr = htonl(INADDR_ANY); | |
if (bind(udp_socket, (struct sockaddr *)&sa, sizeof sa) < 0) { | |
if (td->verbose != -1) { | |
printf("ERROR: Failed to bind (%s).\n", strerror(errno)); | |
} | |
close(udp_socket); | |
pend_s(1); | |
goto thr_restart; | |
} | |
if (td->poll_type == PM_USE_EPOLL) { | |
epollfd = epoll_create1(0); | |
if (epollfd >= 0) { | |
if (add_to_epoll(epollfd, udp_socket) < 0) { | |
close(epollfd); | |
epollfd = -1; | |
} | |
} | |
if (epollfd < 0 && td->verbose != -1) { | |
printf("ERROR: Couldn't setup epoll socket - using fallback.\n"); | |
} | |
} | |
if (td->verbose > 0 && epollfd >= 0) | |
printf("+++ Allocated epoll socket.\n"); | |
while (!td->end_time || (td->end_time > time_ticks_ms())) { | |
socklen_t len = sizeof(struct sockaddr_in); | |
struct sockaddr_in sar; | |
char buf[2048]; | |
int read = 0; | |
while (ready_to_read(udp_socket, epollfd, td->poll_type) > 0) { | |
read = 1; | |
do { | |
if ((res = recvfrom(udp_socket, &buf, 2048, 0, | |
(struct sockaddr *)&sar, | |
(socklen_t *)&len)) > 0) | |
td->buckets[get_bucket(res)].bytes += res; | |
} while (res > 0 && td->poll_type != PM_USE_SELECT); | |
if (res <= 0) { | |
if (errno == EAGAIN || errno == EWOULDBLOCK) | |
break; | |
if (td->verbose > 0) | |
printf("thread recvfrom failure. restarting socket.\n"); | |
close(epollfd); | |
close(udp_socket); | |
goto thr_restart; | |
} | |
} | |
if (!read) { | |
res = 0; | |
pend_us(500); | |
} | |
if (epollfd == -1 && ready_to_read(udp_socket, epollfd, | |
td->poll_type) < 0) { | |
if (td->verbose > 0) | |
printf("thread read ready failure. restarting socket.\n"); | |
close(epollfd); | |
close(udp_socket); | |
goto thr_restart; | |
} | |
} | |
close(epollfd); | |
close(udp_socket); | |
return NULL; | |
} | |
static void help(char *prgm) | |
{ | |
printf("Usage: %s [OPTIONS].. port\n", prgm); | |
printf("Simple UDP sink that tells how much packet data was sent " | |
"with heuristics.\n"); | |
printf("\n"); | |
printf("Mandatory arguments:\n"); | |
printf(" port Set the port on which to listen.\n"); | |
printf("\n"); | |
printf("Options and arguments:\n"); | |
printf(" -h, --help Print this help message and exit.\n"); | |
printf(" -v, --verbose Set the program to be verbose in output.\n"); | |
printf(" -t, --stat-time=SEC Set the stat print time in seconds.\n"); | |
printf(" -a, --stop-after=SEC Set the time to complete - default is " | |
"0 (Unlimited).\n"); | |
printf(" -H, --threads=THREADS Spawn THREADS number of threads for " | |
"listening (default = 1).\n"); | |
printf(" -F, --affinity Only when spawning threads, forces all " | |
"threads onto cores.\n"); | |
printf(" -m, --method=MTHD One of {epoll, poll, select, read}\n"); | |
printf(" -q, --quiet Set the program to suppress output.\n"); | |
exit(0); | |
} | |
static void parse_arguments(int argc, char *argv[], bool *affinity, | |
uint16_t *nr_threads, int *verbose, | |
uint64_t *secs_to_report, uint64_t *secs_to_run, | |
uint16_t *lport, char **bucket_string, | |
enum polling_mechanism *pt) | |
{ | |
int opt; | |
static struct option long_options[] = { | |
{"affinity", no_argument, NULL, 'F'}, | |
{"buckets", required_argument, NULL, 'k'}, | |
{"help", no_argument, NULL, 'h'}, | |
{"method", required_argument, NULL, 'm'}, | |
{"quiet", no_argument, NULL, 'q'}, | |
{"stat-time", required_argument, NULL, 't'}, | |
{"stop-after", required_argument, NULL, 'a'}, | |
{"threads", required_argument, NULL, 'H'}, | |
{"verbose", no_argument, NULL, 'v'}, | |
{NULL, 0, NULL, 0} | |
}; | |
*pt = PM_USE_EPOLL; | |
while ((opt = getopt_long(argc, argv, "Fhvqt:a:H:k:m:", long_options, | |
NULL)) != -1) { | |
if (opt == 0) | |
opt = long_options[optind].val; | |
switch (opt) { | |
case 'F': | |
*affinity = true; | |
break; | |
case 'k': | |
*bucket_string = strdup(optarg); | |
break; | |
case 'H': | |
*nr_threads = atoi(optarg); | |
break; | |
case 'm': | |
if (!strcmp(optarg, "poll")) | |
*pt = PM_USE_POLL; | |
else if (!strcmp(optarg, "epoll")) | |
*pt = PM_USE_EPOLL; | |
else if (!strcmp(optarg, "select")) | |
*pt = PM_USE_SELECT; | |
else if (!strcmp(optarg, "read")) | |
*pt = PM_USE_READ_NB; | |
else | |
help(argv[0]); | |
break; | |
case 'v': | |
*verbose = 1; | |
break; | |
case 'q': | |
*verbose = -1; | |
break; | |
case 't': | |
*secs_to_report = atoi(optarg); | |
break; | |
case 'a': | |
*secs_to_run = atoi(optarg); | |
break; | |
default: /* fallthrough */ | |
case 'h': | |
help(argv[0]); | |
break; | |
} | |
} | |
/* Parse non-option argument - which should be port. */ | |
if (optind < argc && argv[optind]) { | |
*lport = atoi(argv[optind]); | |
} else { | |
printf("%s: missing port\n", argv[0]); | |
help(argv[0]); | |
} | |
} | |
static void setup_buckets(const char *buckets, int32_t verb) | |
{ | |
char *current_tok; | |
char *save_ptr; | |
char *bucket_string = strdup(buckets); | |
if (bucket_string == NULL) { | |
printf("FATAL: Unable to allocate bucket string.\n"); | |
exit(-1); | |
} | |
for (current_tok = strtok_r(bucket_string, ",", &save_ptr); | |
current_tok != NULL && num_buckets < BUCKETS; | |
current_tok = strtok_r(NULL, ",", &save_ptr)) { | |
char *end_ptr = NULL; | |
bucket_sizes = realloc(bucket_sizes, sizeof(size_t) * (++num_buckets)); | |
if (bucket_sizes == NULL) { | |
printf("FATAL: Bucket setup failed: %s", strerror(errno)); | |
free(bucket_string); | |
exit(-1); | |
} | |
bucket_sizes[num_buckets - 1] = strtoul(current_tok, &end_ptr, 0); | |
if (end_ptr == current_tok || end_ptr == NULL) { | |
printf("FATAL: Invalid number: '%s'.\n", current_tok); | |
free(bucket_string); | |
exit(-1); | |
} | |
} | |
free(bucket_string); | |
if (verb) { | |
size_t i; | |
printf("++ Setup %lu buckets {", num_buckets); | |
for (i = 0; i < num_buckets; i++) { | |
printf("%lu,", bucket_sizes[i]); | |
} | |
printf("}\n"); | |
} | |
} | |
static void run_report(int32_t verb, uint16_t nr_threads, uint64_t start_time) | |
{ | |
struct bucket buckets[BUCKETS]; | |
size_t thread; | |
if (verb < 0) | |
return; | |
memset(&buckets, 0, sizeof buckets); | |
for (thread = 0; thread < nr_threads; thread++) { | |
size_t bucket; | |
for (bucket = 0; bucket < BUCKETS; bucket++) | |
buckets[bucket].bytes += byte_ctrs[thread].buckets[bucket].bytes; | |
} | |
report(buckets, verb, start_time); | |
} | |
int main(int argc, char *argv[]) | |
{ | |
char *bucket_string = "64,128,256,512,1024,1500,1800,2048"; | |
uint64_t secs_to_report = 5; | |
enum polling_mechanism pt; | |
uint64_t secs_to_run = 0; | |
uint16_t nr_threads = 1; | |
uint64_t start_time = 0; | |
uint64_t end_time = 0; | |
uint64_t cur_time = 0; | |
bool affinity = false; | |
uint16_t lport = 0; | |
int32_t verb = 0; | |
int opt_ind; | |
parse_arguments(argc, argv, &affinity, &nr_threads, &verb, &secs_to_report, | |
&secs_to_run, &lport, &bucket_string, &pt); | |
if (verb > 0) { | |
printf("++++ Starting listeners on %d reporting every %lus\n" | |
" polling with %d and then stopping", lport, | |
secs_to_report, pt); | |
if (secs_to_run) { | |
printf(" after %lus.\n", secs_to_run); | |
} else { | |
printf(" on kill / CTRL+C\n"); | |
} | |
} | |
setup_buckets(bucket_string, verb); | |
byte_ctrs = calloc(sizeof(struct thread_data), nr_threads); | |
if (!byte_ctrs) { | |
printf("ERROR: Unable to allocate thread buckets.\n"); | |
exit(-1); | |
} | |
if (secs_to_run) | |
end_time = time_ticks_ms() + (secs_to_run * MS_PER_SEC); | |
for (opt_ind = 0; opt_ind < nr_threads; opt_ind++) { | |
if (verb > 0) | |
printf("Spawning thread (%d)...\n", opt_ind + 1); | |
byte_ctrs[opt_ind].verbose = verb; | |
byte_ctrs[opt_ind].port = lport; | |
byte_ctrs[opt_ind].end_time = end_time; | |
if (affinity) | |
byte_ctrs[opt_ind].core_idx = opt_ind + 1; | |
pthread_create(&byte_ctrs[opt_ind].thread_handle, NULL, | |
udp_listener, &byte_ctrs[opt_ind]); | |
} | |
cur_time = time_ticks_ms(); | |
start_time = cur_time; | |
while ((end_time > cur_time) || !end_time) { | |
pend_s(secs_to_report); | |
run_report(verb, nr_threads, start_time); | |
cur_time = time_ticks_ms(); | |
} | |
for (opt_ind = 0; opt_ind < nr_threads; opt_ind++) { | |
pthread_join(byte_ctrs[opt_ind].thread_handle, NULL); | |
} | |
/* Final report - always print. */ | |
run_report(1, nr_threads, start_time); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment