Created
September 10, 2024 02:54
-
-
Save mooreniemi/bb528903d5a8d9ae77304eaeb0b648ad to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[package] | |
name = "gen_conc" | |
version = "0.1.0" | |
edition = "2021" | |
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html | |
[dependencies] | |
tokio = { version = "1", features = ["full"] } | |
sha2 = "0.10" # For hashing document content (SHA-256) | |
rand = "*" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use rand::Rng; | |
use sha2::{Digest, Sha256}; | |
use std::collections::HashMap; | |
use std::sync::{Arc, Mutex}; | |
use tokio::sync::mpsc; | |
// Message type for IndexEngine | |
pub enum IndexMessage { | |
AddDoc { doc_id: String, doc_content: String }, | |
DeleteDoc { doc_id: String }, | |
} | |
// Any DocumentStore would need to implement this, you bring a doc_id, you get back doc | |
pub trait Readable { | |
fn read(&self, doc_id: &str) -> Option<String>; | |
} | |
// Any search data structure would need to implement this, you don't have doc_id for lookup | |
pub trait Searchable { | |
fn search(&self, query: &str) -> Option<String>; | |
} | |
// Update can be created from add and delete | |
pub trait Writable { | |
fn add_document(&self, doc_id: String, doc_content: String) -> Result<(), String>; | |
fn delete_document(&self, doc_id: String) -> Result<(), String>; | |
} | |
// DocumentStore for storing actual document content, obv Mutex is just example | |
pub struct DocumentStore { | |
docs: Mutex<HashMap<String, String>>, // doc_id -> doc_content | |
} | |
impl DocumentStore { | |
pub fn new() -> Self { | |
DocumentStore { | |
docs: Mutex::new(HashMap::new()), | |
} | |
} | |
} | |
impl Writable for DocumentStore { | |
fn add_document(&self, doc_id: String, doc_content: String) -> Result<(), String> { | |
let mut docs = self | |
.docs | |
.lock() | |
.map_err(|_| "Failed to lock docstore".to_string())?; | |
docs.insert(doc_id, doc_content); | |
Ok(()) | |
} | |
fn delete_document(&self, doc_id: String) -> Result<(), String> { | |
let mut docs = self | |
.docs | |
.lock() | |
.map_err(|_| "Failed to lock docstore".to_string())?; | |
docs.remove(&doc_id); | |
Ok(()) | |
} | |
} | |
impl Readable for DocumentStore { | |
fn read(&self, doc_id: &str) -> Option<String> { | |
let docs = self.docs.lock().unwrap(); | |
docs.get(doc_id).cloned() | |
} | |
} | |
// HashStore for storing hashes of document content, obv Mutex is just example | |
pub struct HashStore { | |
hashes: Mutex<HashMap<String, String>>, // hash_of_doc_content -> doc_id | |
} | |
impl HashStore { | |
pub fn new() -> Self { | |
HashStore { | |
hashes: Mutex::new(HashMap::new()), | |
} | |
} | |
pub fn compute_hash(doc_content: &str) -> String { | |
let mut hasher = Sha256::new(); | |
hasher.update(doc_content.as_bytes()); | |
let result = hasher.finalize(); | |
format!("{:x}", result) | |
} | |
} | |
impl Writable for HashStore { | |
fn add_document(&self, doc_id: String, doc_content: String) -> Result<(), String> { | |
let hash = HashStore::compute_hash(&doc_content); | |
let mut hashes = self | |
.hashes | |
.lock() | |
.map_err(|_| "Failed to lock hashstore".to_string())?; | |
hashes.insert(hash, doc_id); | |
Ok(()) | |
} | |
fn delete_document(&self, doc_id: String) -> Result<(), String> { | |
let mut hashes = self | |
.hashes | |
.lock() | |
.map_err(|_| "Failed to lock hashstore".to_string())?; | |
// Remove entry by value (doc_id) since the key is now the hash of the content | |
hashes.retain(|_, v| v != &doc_id); | |
Ok(()) | |
} | |
} | |
impl Searchable for HashStore { | |
fn search(&self, doc_content: &str) -> Option<String> { | |
let hash = HashStore::compute_hash(doc_content); | |
let hashes = self.hashes.lock().unwrap(); | |
hashes.get(&hash).cloned() | |
} | |
} | |
// IndexEngine for handling two-phase commit operations | |
pub struct IndexEngine<W: Writable + Readable, H: Writable + Searchable> { | |
docstore: Arc<W>, | |
hashstore: Arc<H>, | |
message_rx: mpsc::Receiver<IndexMessage>, | |
} | |
impl<W: Writable + Readable, H: Writable + Searchable> IndexEngine<W, H> { | |
pub async fn run(mut self) { | |
while let Some(message) = self.message_rx.recv().await { | |
match message { | |
// Add document logic | |
IndexMessage::AddDoc { | |
doc_id, | |
doc_content, | |
} => { | |
println!("Attempting to add doc: {}", doc_id); | |
// Prepare Phase: Try to add to both docstore and hashstore | |
let docstore_result = self | |
.docstore | |
.add_document(doc_id.clone(), doc_content.clone()); | |
let hashstore_result = self | |
.hashstore | |
.add_document(doc_id.clone(), doc_content.clone()); | |
// Check for errors and handle rollback if necessary | |
if let Err(err) = docstore_result { | |
println!("Failed to add to docstore: {}. Rolling back...", err); | |
if hashstore_result.is_ok() { | |
let _ = self.hashstore.delete_document(doc_id.clone()); | |
} | |
continue; | |
} | |
if let Err(err) = hashstore_result { | |
println!("Failed to add to hashstore: {}. Rolling back...", err); | |
let _ = self.docstore.delete_document(doc_id.clone()); | |
continue; | |
} | |
println!("Successfully added doc {} to both stores", doc_id); | |
} | |
// Delete document logic | |
IndexMessage::DeleteDoc { doc_id } => { | |
println!("Attempting to delete doc: {}", doc_id); | |
// Prepare Phase: Fetch current values for potential rollback | |
let current_doc_content = self.docstore.read(&doc_id); | |
if let Some(doc_content) = current_doc_content { | |
let current_hash_content = self.hashstore.search(&doc_content); | |
if current_hash_content.is_none() { | |
println!("Hash not found for doc_id: {}. Aborting delete.", doc_id); | |
continue; | |
} | |
// Try to delete from both stores | |
let docstore_result = self.docstore.delete_document(doc_id.clone()); | |
let hashstore_result = self.hashstore.delete_document(doc_id.clone()); | |
// Check for errors and handle rollback if necessary | |
if let Err(err) = docstore_result { | |
println!("Failed to delete from docstore: {}. Rolling back...", err); | |
if let Some(_hash_content) = current_hash_content { | |
let _ = self | |
.hashstore | |
.add_document(doc_id.clone(), doc_content.clone()); | |
} | |
continue; | |
} | |
if let Err(err) = hashstore_result { | |
println!("Failed to delete from hashstore: {}. Rolling back...", err); | |
let _ = self | |
.docstore | |
.add_document(doc_id.clone(), doc_content.clone()); | |
continue; | |
} | |
println!("Successfully deleted doc {} from both stores", doc_id); | |
} else { | |
println!( | |
"Document content not found for doc_id: {}. Aborting delete.", | |
doc_id | |
); | |
} | |
} | |
} | |
} | |
} | |
} | |
#[tokio::main] | |
async fn main() { | |
// Create shared DocumentStore and HashStore instances using Arc | |
let docstore = Arc::new(DocumentStore::new()); | |
let hashstore = Arc::new(HashStore::new()); | |
// Create the message channel for IndexEngine | |
let (message_tx, message_rx) = mpsc::channel(100); | |
// Create the IndexEngine with shared resources and message receiver | |
let index_engine = IndexEngine { | |
docstore: Arc::clone(&docstore), | |
hashstore: Arc::clone(&hashstore), | |
message_rx, | |
}; | |
// Spawn the IndexEngine in a background task | |
tokio::spawn(index_engine.run()); | |
// List of documents to add | |
let doc_contents = vec![ | |
("doc1", "This is the content of document 1"), | |
("doc2", "Here is the content of document 2"), | |
("doc3", "Random content for document 3"), | |
("doc4", "Another document with some text"), | |
("doc5", "Yet another document with different content"), | |
("doc6", "Document number six with some data"), | |
("doc7", "Lucky number seven document"), | |
("doc8", "Document eight, content not too late"), | |
("doc9", "Almost there, document nine"), | |
("doc10", "Finally, document number ten"), | |
]; | |
// Add, search, and delete tasks interleaved | |
let mut tasks = Vec::new(); | |
let delete_doc_ids = vec!["doc1", "doc2"]; | |
for (_i, (doc_id, doc_content)) in doc_contents.clone().into_iter().enumerate() { | |
let message_tx_clone = message_tx.clone(); | |
let docstore_clone = Arc::clone(&docstore); | |
let hashstore_clone = Arc::clone(&hashstore); | |
// Random delay for interleaving | |
let delay: u64 = rand::thread_rng().gen_range(10, 100); | |
// Add task | |
let doc_id_add = doc_id.to_string(); | |
let doc_content_add = doc_content.to_string(); | |
tasks.push(tokio::spawn({ | |
let message_tx_clone = message_tx_clone.clone(); // Ensure the closure has its own copy of the cloned value | |
async move { | |
tokio::time::sleep(tokio::time::Duration::from_millis(delay)).await; | |
println!("Adding document: {}", doc_id_add); | |
message_tx_clone | |
.send(IndexMessage::AddDoc { | |
doc_id: doc_id_add, | |
doc_content: doc_content_add, | |
}) | |
.await | |
.unwrap(); | |
} | |
})); | |
// Search task | |
let doc_id_search = doc_id.to_string(); | |
let doc_content_search = doc_content.to_string(); | |
tasks.push(tokio::spawn({ | |
let docstore_clone = Arc::clone(&docstore_clone); // Clone Arc inside the closure | |
let hashstore_clone = Arc::clone(&hashstore_clone); // Clone Arc inside the closure | |
async move { | |
tokio::time::sleep(tokio::time::Duration::from_millis(delay + 50)).await; | |
// Search for the document | |
let docstore_read = docstore_clone.read(&doc_id_search); | |
println!("Read from docstore: {:?}", docstore_read); | |
let hashstore_search = hashstore_clone.search(&doc_content_search); | |
println!( | |
"Search from hashstore (content -> doc_id): {:?}", | |
hashstore_search | |
); | |
} | |
})); | |
// Delete task for even indexed documents | |
if delete_doc_ids.contains(&doc_id) { | |
let message_tx_clone = message_tx.clone(); | |
let doc_id_delete = doc_id.to_string(); | |
let doc_content_delete = doc_content.to_string(); | |
tasks.push(tokio::spawn({ | |
let docstore_clone = Arc::clone(&docstore_clone); // Clone Arc inside the closure | |
let hashstore_clone = Arc::clone(&hashstore_clone); // Clone Arc inside the closure | |
async move { | |
tokio::time::sleep(tokio::time::Duration::from_millis(delay + 100)).await; | |
println!("Deleting document: {}", doc_id_delete); | |
message_tx_clone | |
.send(IndexMessage::DeleteDoc { | |
doc_id: doc_id_delete.clone(), | |
}) | |
.await | |
.unwrap(); | |
// Final search after delete to confirm | |
let docstore_read_after_delete = docstore_clone.read(&doc_id_delete); | |
println!( | |
"Read after delete from docstore: {:?}", | |
docstore_read_after_delete | |
); | |
let hashstore_search_after_delete = hashstore_clone.search(&doc_content_delete); | |
println!( | |
"Search after delete from hashstore (content -> doc_id): {:?}", | |
hashstore_search_after_delete | |
); | |
} | |
})); | |
} | |
} | |
// Wait for all tasks to complete | |
for task in tasks { | |
task.await.unwrap(); | |
} | |
println!("All tasks completed."); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment