Skip to content

Instantly share code, notes, and snippets.

@matthewjberger
Created September 4, 2024 20:54
Show Gist options
  • Save matthewjberger/0e94c0c6e3a42a8a33b75fc9c8fe95cc to your computer and use it in GitHub Desktop.
Save matthewjberger/0e94c0c6e3a42a8a33b75fc9c8fe95cc to your computer and use it in GitHub Desktop.
Pub/sub broker messaging in-process without an async executor using Send + Sync compatible types
use std::{
collections::HashMap,
sync::{Arc, RwLock, Weak},
};
use std::{
collections::VecDeque,
sync::{RwLockReadGuard, RwLockWriteGuard},
};
use uuid::Uuid;
pub type ClientHandle<T> = Arc<RwLock<Client<T>>>;
pub struct Client<T: Clone> {
id: Uuid,
event_queue: RwLock<VecDeque<T>>,
ring_buffer_size: usize,
}
impl<T: Clone> Default for Client<T> {
fn default() -> Self {
Self {
id: Uuid::new_v4(),
event_queue: RwLock::new(VecDeque::new()),
ring_buffer_size: 100,
}
}
}
impl<T: Clone> Client<T> {
pub fn new() -> Arc<RwLock<Self>> {
Arc::new(RwLock::new(Self::default()))
}
pub fn event_queue(&self) -> Option<RwLockReadGuard<VecDeque<T>>> {
self.event_queue.read().ok()
}
pub fn event_queue_mut(&self) -> Option<RwLockWriteGuard<VecDeque<T>>> {
self.event_queue.write().ok()
}
pub fn with_ring_buffer_size(size: usize) -> Arc<RwLock<Self>> {
Arc::new(RwLock::new(Self {
ring_buffer_size: size,
..Default::default()
}))
}
pub fn id(&self) -> Uuid {
self.id
}
pub fn ring_buffer_size(&self) -> usize {
self.ring_buffer_size
}
pub fn next_message(&self) -> Option<T> {
if let Ok(mut queue) = self.event_queue.write() {
queue.pop_front()
} else {
None
}
}
pub fn peek_message(&self) -> Option<T> {
if let Ok(queue) = self.event_queue.read() {
queue.front().cloned()
} else {
None
}
}
}
#[derive(Default)]
pub struct Broker<T: Clone> {
subscribers: HashMap<String, Vec<Weak<RwLock<Client<T>>>>>,
}
impl<T: Clone> Broker<T> {
pub fn new() -> Self {
Self {
subscribers: HashMap::new(),
}
}
pub fn subscribe(&mut self, topic: &str, client: &Arc<RwLock<Client<T>>>) {
let client_weak = Arc::downgrade(client);
self.subscribers
.entry(topic.to_string())
.or_default()
.push(client_weak);
}
pub fn unsubscribe(&mut self, topic: &str, client_id: Uuid) -> Result<(), &'static str> {
if let Some(subscribers) = self.subscribers.get_mut(topic) {
subscribers.retain(|subscriber| {
if let Some(handle) = subscriber.upgrade() {
if let Ok(subscriber) = handle.read() {
subscriber.id() != client_id
} else {
false
}
} else {
false
}
});
Ok(())
} else {
Err("TopicNotFound")
}
}
pub fn publish(&mut self, topic: &str, message: T) {
if let Some(subscribers) = self.subscribers.get_mut(topic) {
// Use retain to filter out the expired weak references
subscribers.retain(|subscriber_weak| {
if let Some(subscriber_strong) = subscriber_weak.upgrade() {
// Attempt to acquire a write lock on the subscriber
if let Ok(subscriber) = subscriber_strong.write() {
let ring_buffer_size = subscriber.ring_buffer_size();
let event_queue_len = subscriber.event_queue().map_or(0, |q| q.len());
// Ensure there's space in the ring buffer
if event_queue_len == ring_buffer_size {
if let Some(mut event_queue) = subscriber.event_queue_mut() {
event_queue.pop_front();
}
}
// Push the message into the event queue
if let Some(mut event_queue) = subscriber.event_queue_mut() {
event_queue.push_back(message.clone());
}
true
} else {
false // Lock acquisition failed, drop the weak reference
}
} else {
false // Drop the weak reference if it's no longer valid
}
});
// Remove the topic entry if there are no subscribers left
if subscribers.is_empty() {
self.subscribers.remove(topic);
}
}
}
}
#[cfg(test)]
mod tests {
use super::{Broker, Client};
#[derive(Debug, Clone, PartialEq)]
pub struct Message {
content: String,
}
impl Message {
pub fn new(content: &str) -> Self {
Self {
content: content.to_string(),
}
}
}
#[test]
fn test_single_client_receive_message() {
let mut broker = Broker::new();
let client1 = Client::new();
broker.subscribe("topic1", &client1);
broker.publish("topic1", Message::new("hello world"));
assert_eq!(
client1.read().unwrap().next_message().unwrap().content,
"hello world"
);
}
#[test]
fn test_multiple_subscribers_receive_message() {
let mut broker = Broker::new();
let client1 = Client::new();
let client2 = Client::new();
broker.subscribe("topic1", &client1);
broker.subscribe("topic1", &client2);
broker.publish("topic1", Message::new("hello world"));
assert_eq!(
client1.read().unwrap().next_message().unwrap().content,
"hello world"
);
assert_eq!(
client2.read().unwrap().next_message().unwrap().content,
"hello world"
);
}
#[test]
fn test_unsubscribe() {
let mut broker = Broker::new();
let client1 = Client::new();
let client2 = Client::new();
broker.subscribe("topic1", &client1);
broker.subscribe("topic1", &client2);
broker
.unsubscribe("topic1", client1.read().unwrap().id())
.unwrap();
broker.publish("topic1", Message::new("hello world"));
assert_eq!(client1.read().unwrap().next_message(), None);
assert_eq!(
client2.read().unwrap().next_message().unwrap().content,
"hello world"
);
}
#[test]
fn test_multiple_topics() {
let mut broker = Broker::new();
let client = Client::new();
broker.subscribe("topic1", &client);
broker.subscribe("topic2", &client);
broker.publish("topic1", Message::new("hello topic1"));
broker.publish("topic2", Message::new("hello topic2"));
assert_eq!(
client.read().unwrap().next_message().unwrap().content,
"hello topic1"
);
assert_eq!(
client.read().unwrap().next_message().unwrap().content,
"hello topic2"
);
}
#[test]
fn test_ring_buffer() {
let mut broker = Broker::new();
let client = Client::with_ring_buffer_size(2); // set ring buffer size to 2
broker.subscribe("topic1", &client);
broker.publish("topic1", Message::new("message1"));
broker.publish("topic1", Message::new("message2"));
broker.publish("topic1", Message::new("message3"));
// Expecting the oldest message to be discarded due to ring buffer
assert_eq!(
client.read().unwrap().next_message().unwrap().content,
"message2"
);
assert_eq!(
client.read().unwrap().next_message().unwrap().content,
"message3"
);
}
#[test]
fn usage_example() {
// Create a new broker
let mut broker = Broker::new();
// Create a client with a specified ring buffer size
let client = Client::with_ring_buffer_size(5);
// Subscribe the client to a topic
broker.subscribe("news", &client);
// The broker publishes a message to the topic
broker.publish("news", Message::new("Breaking news!"));
// The client retrieves the message from its ring buffer
assert_eq!(
client.read().unwrap().next_message().unwrap().content,
"Breaking news!"
);
}
#[test]
fn test_weak_reference_cleanup() {
let mut broker = Broker::new();
// Subscribe a client to a topic
{
let client = Client::new();
broker.subscribe("topic1", &client);
// Ensure there's a subscriber for "topic1"
assert!(broker.subscribers.contains_key("topic1"));
// Simulating a message publish
broker.publish("topic1", Message::new("Test"));
// Ensure the client received the message
assert_eq!(
client.read().unwrap().next_message().unwrap().content,
"Test"
);
} // Client goes out of scope here and should be dropped
// The weak reference to the client should not be upgradeable now
if let Some(subscribers) = broker.subscribers.get("topic1") {
assert!(subscribers[0].upgrade().is_none());
} else {
panic!("Topic 'topic1' should still exist at this point.");
}
// Simulating another publish to trigger the weak reference cleanup
broker.publish("topic1", Message::new("Test 2"));
// Check if weak reference cleanup worked by checking the subscribers for "topic1"
assert!(!broker.subscribers.contains_key("topic1"));
}
#[test]
fn test_peek_message() {
let mut broker = Broker::new();
let client = Client::new();
broker.subscribe("topic1", &client);
broker.publish("topic1", Message::new("peek this"));
// Peek the message
assert_eq!(
client.read().unwrap().peek_message().unwrap().content,
"peek this"
);
// Check if the message is still in the queue
assert_eq!(
client.read().unwrap().next_message().unwrap().content,
"peek this"
);
// Ensure the message queue is now empty after calling `next_message`
assert!(client.read().unwrap().next_message().is_none());
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment