Skip to content

Instantly share code, notes, and snippets.

@mooreniemi
Created September 10, 2024 02:54
Show Gist options
  • Save mooreniemi/bb528903d5a8d9ae77304eaeb0b648ad to your computer and use it in GitHub Desktop.
Save mooreniemi/bb528903d5a8d9ae77304eaeb0b648ad to your computer and use it in GitHub Desktop.
[package]
name = "gen_conc"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tokio = { version = "1", features = ["full"] }
sha2 = "0.10" # For hashing document content (SHA-256)
rand = "*"
use rand::Rng;
use sha2::{Digest, Sha256};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use tokio::sync::mpsc;
// Message type for IndexEngine
pub enum IndexMessage {
AddDoc { doc_id: String, doc_content: String },
DeleteDoc { doc_id: String },
}
// Any DocumentStore would need to implement this, you bring a doc_id, you get back doc
pub trait Readable {
fn read(&self, doc_id: &str) -> Option<String>;
}
// Any search data structure would need to implement this, you don't have doc_id for lookup
pub trait Searchable {
fn search(&self, query: &str) -> Option<String>;
}
// Update can be created from add and delete
pub trait Writable {
fn add_document(&self, doc_id: String, doc_content: String) -> Result<(), String>;
fn delete_document(&self, doc_id: String) -> Result<(), String>;
}
// DocumentStore for storing actual document content, obv Mutex is just example
pub struct DocumentStore {
docs: Mutex<HashMap<String, String>>, // doc_id -> doc_content
}
impl DocumentStore {
pub fn new() -> Self {
DocumentStore {
docs: Mutex::new(HashMap::new()),
}
}
}
impl Writable for DocumentStore {
fn add_document(&self, doc_id: String, doc_content: String) -> Result<(), String> {
let mut docs = self
.docs
.lock()
.map_err(|_| "Failed to lock docstore".to_string())?;
docs.insert(doc_id, doc_content);
Ok(())
}
fn delete_document(&self, doc_id: String) -> Result<(), String> {
let mut docs = self
.docs
.lock()
.map_err(|_| "Failed to lock docstore".to_string())?;
docs.remove(&doc_id);
Ok(())
}
}
impl Readable for DocumentStore {
fn read(&self, doc_id: &str) -> Option<String> {
let docs = self.docs.lock().unwrap();
docs.get(doc_id).cloned()
}
}
// HashStore for storing hashes of document content, obv Mutex is just example
pub struct HashStore {
hashes: Mutex<HashMap<String, String>>, // hash_of_doc_content -> doc_id
}
impl HashStore {
pub fn new() -> Self {
HashStore {
hashes: Mutex::new(HashMap::new()),
}
}
pub fn compute_hash(doc_content: &str) -> String {
let mut hasher = Sha256::new();
hasher.update(doc_content.as_bytes());
let result = hasher.finalize();
format!("{:x}", result)
}
}
impl Writable for HashStore {
fn add_document(&self, doc_id: String, doc_content: String) -> Result<(), String> {
let hash = HashStore::compute_hash(&doc_content);
let mut hashes = self
.hashes
.lock()
.map_err(|_| "Failed to lock hashstore".to_string())?;
hashes.insert(hash, doc_id);
Ok(())
}
fn delete_document(&self, doc_id: String) -> Result<(), String> {
let mut hashes = self
.hashes
.lock()
.map_err(|_| "Failed to lock hashstore".to_string())?;
// Remove entry by value (doc_id) since the key is now the hash of the content
hashes.retain(|_, v| v != &doc_id);
Ok(())
}
}
impl Searchable for HashStore {
fn search(&self, doc_content: &str) -> Option<String> {
let hash = HashStore::compute_hash(doc_content);
let hashes = self.hashes.lock().unwrap();
hashes.get(&hash).cloned()
}
}
// IndexEngine for handling two-phase commit operations
pub struct IndexEngine<W: Writable + Readable, H: Writable + Searchable> {
docstore: Arc<W>,
hashstore: Arc<H>,
message_rx: mpsc::Receiver<IndexMessage>,
}
impl<W: Writable + Readable, H: Writable + Searchable> IndexEngine<W, H> {
pub async fn run(mut self) {
while let Some(message) = self.message_rx.recv().await {
match message {
// Add document logic
IndexMessage::AddDoc {
doc_id,
doc_content,
} => {
println!("Attempting to add doc: {}", doc_id);
// Prepare Phase: Try to add to both docstore and hashstore
let docstore_result = self
.docstore
.add_document(doc_id.clone(), doc_content.clone());
let hashstore_result = self
.hashstore
.add_document(doc_id.clone(), doc_content.clone());
// Check for errors and handle rollback if necessary
if let Err(err) = docstore_result {
println!("Failed to add to docstore: {}. Rolling back...", err);
if hashstore_result.is_ok() {
let _ = self.hashstore.delete_document(doc_id.clone());
}
continue;
}
if let Err(err) = hashstore_result {
println!("Failed to add to hashstore: {}. Rolling back...", err);
let _ = self.docstore.delete_document(doc_id.clone());
continue;
}
println!("Successfully added doc {} to both stores", doc_id);
}
// Delete document logic
IndexMessage::DeleteDoc { doc_id } => {
println!("Attempting to delete doc: {}", doc_id);
// Prepare Phase: Fetch current values for potential rollback
let current_doc_content = self.docstore.read(&doc_id);
if let Some(doc_content) = current_doc_content {
let current_hash_content = self.hashstore.search(&doc_content);
if current_hash_content.is_none() {
println!("Hash not found for doc_id: {}. Aborting delete.", doc_id);
continue;
}
// Try to delete from both stores
let docstore_result = self.docstore.delete_document(doc_id.clone());
let hashstore_result = self.hashstore.delete_document(doc_id.clone());
// Check for errors and handle rollback if necessary
if let Err(err) = docstore_result {
println!("Failed to delete from docstore: {}. Rolling back...", err);
if let Some(_hash_content) = current_hash_content {
let _ = self
.hashstore
.add_document(doc_id.clone(), doc_content.clone());
}
continue;
}
if let Err(err) = hashstore_result {
println!("Failed to delete from hashstore: {}. Rolling back...", err);
let _ = self
.docstore
.add_document(doc_id.clone(), doc_content.clone());
continue;
}
println!("Successfully deleted doc {} from both stores", doc_id);
} else {
println!(
"Document content not found for doc_id: {}. Aborting delete.",
doc_id
);
}
}
}
}
}
}
#[tokio::main]
async fn main() {
// Create shared DocumentStore and HashStore instances using Arc
let docstore = Arc::new(DocumentStore::new());
let hashstore = Arc::new(HashStore::new());
// Create the message channel for IndexEngine
let (message_tx, message_rx) = mpsc::channel(100);
// Create the IndexEngine with shared resources and message receiver
let index_engine = IndexEngine {
docstore: Arc::clone(&docstore),
hashstore: Arc::clone(&hashstore),
message_rx,
};
// Spawn the IndexEngine in a background task
tokio::spawn(index_engine.run());
// List of documents to add
let doc_contents = vec![
("doc1", "This is the content of document 1"),
("doc2", "Here is the content of document 2"),
("doc3", "Random content for document 3"),
("doc4", "Another document with some text"),
("doc5", "Yet another document with different content"),
("doc6", "Document number six with some data"),
("doc7", "Lucky number seven document"),
("doc8", "Document eight, content not too late"),
("doc9", "Almost there, document nine"),
("doc10", "Finally, document number ten"),
];
// Add, search, and delete tasks interleaved
let mut tasks = Vec::new();
let delete_doc_ids = vec!["doc1", "doc2"];
for (_i, (doc_id, doc_content)) in doc_contents.clone().into_iter().enumerate() {
let message_tx_clone = message_tx.clone();
let docstore_clone = Arc::clone(&docstore);
let hashstore_clone = Arc::clone(&hashstore);
// Random delay for interleaving
let delay: u64 = rand::thread_rng().gen_range(10, 100);
// Add task
let doc_id_add = doc_id.to_string();
let doc_content_add = doc_content.to_string();
tasks.push(tokio::spawn({
let message_tx_clone = message_tx_clone.clone(); // Ensure the closure has its own copy of the cloned value
async move {
tokio::time::sleep(tokio::time::Duration::from_millis(delay)).await;
println!("Adding document: {}", doc_id_add);
message_tx_clone
.send(IndexMessage::AddDoc {
doc_id: doc_id_add,
doc_content: doc_content_add,
})
.await
.unwrap();
}
}));
// Search task
let doc_id_search = doc_id.to_string();
let doc_content_search = doc_content.to_string();
tasks.push(tokio::spawn({
let docstore_clone = Arc::clone(&docstore_clone); // Clone Arc inside the closure
let hashstore_clone = Arc::clone(&hashstore_clone); // Clone Arc inside the closure
async move {
tokio::time::sleep(tokio::time::Duration::from_millis(delay + 50)).await;
// Search for the document
let docstore_read = docstore_clone.read(&doc_id_search);
println!("Read from docstore: {:?}", docstore_read);
let hashstore_search = hashstore_clone.search(&doc_content_search);
println!(
"Search from hashstore (content -> doc_id): {:?}",
hashstore_search
);
}
}));
// Delete task for even indexed documents
if delete_doc_ids.contains(&doc_id) {
let message_tx_clone = message_tx.clone();
let doc_id_delete = doc_id.to_string();
let doc_content_delete = doc_content.to_string();
tasks.push(tokio::spawn({
let docstore_clone = Arc::clone(&docstore_clone); // Clone Arc inside the closure
let hashstore_clone = Arc::clone(&hashstore_clone); // Clone Arc inside the closure
async move {
tokio::time::sleep(tokio::time::Duration::from_millis(delay + 100)).await;
println!("Deleting document: {}", doc_id_delete);
message_tx_clone
.send(IndexMessage::DeleteDoc {
doc_id: doc_id_delete.clone(),
})
.await
.unwrap();
// Final search after delete to confirm
let docstore_read_after_delete = docstore_clone.read(&doc_id_delete);
println!(
"Read after delete from docstore: {:?}",
docstore_read_after_delete
);
let hashstore_search_after_delete = hashstore_clone.search(&doc_content_delete);
println!(
"Search after delete from hashstore (content -> doc_id): {:?}",
hashstore_search_after_delete
);
}
}));
}
}
// Wait for all tasks to complete
for task in tasks {
task.await.unwrap();
}
println!("All tasks completed.");
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment